【问题标题】:pyspark streaming restore from checkpointpyspark 从检查点流式还原
【发布时间】:2017-07-04 07:32:15
【问题描述】:

我使用启用了检查点的 pyspark 流。 第一次启动成功,但重新启动时崩溃并出现错误:

INFO scheduler.DAGScheduler:ResultStage 6 (runJob at PythonRDD.scala:441) 在 1,160 秒内失败,原因是 Job 由于阶段故障而中止:阶段 6.0 中的任务 0 失败 4 次,最近一次失败:阶段中丢失任务 0.3 6.0(TID 86,h-1.e-contenta.com,执行程序 2):org.apache.spark.api.python.PythonException: 回溯(最近一次通话最后): 文件“/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/worker.py”,第 163 行,在 main func,分析器,反序列化器,序列化器 = read_command(pickleSer,infile) 文件“/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/worker.py”,第 56 行,在 read_command command = serializer.loads(command.value) 文件“/data1/yarn/nm/usercache/appcache/application_1481115309392_0229/container_1481115309392_0229_01_000003/pyspark.zip/pyspark/serializers.py”,第 431 行,在负载中返回 pickle.loads(obj, encoding=encoding) ImportError: 没有模块名为 ...

通过 spark 上下文 addPyFile() 添加的 Python 模块

def create_streaming():
"""
Create streaming context and processing functions
:return: StreamingContext
"""
sc = SparkContext(conf=spark_config)
zip_path = zip_lib(PACKAGES, PY_FILES)
sc.addPyFile(zip_path)
ssc = StreamingContext(sc, BATCH_DURATION)

stream = KafkaUtils.createStream(ssc=ssc, zkQuorum=','.join(ZOOKEEPER_QUORUM),
                                      groupId='new_group',
                                      topics={topic: 1})

stream.checkpoint(BATCH_DURATION)
stream = stream \
    .map(lambda x: process(ujson.loads(x[1]), geo_data_bc_value)) \
    .foreachRDD(lambda_log_writer(topic, schema_bc_value))

ssc.checkpoint(STREAM_CHECKPOINT)
return ssc

if __name__ == '__main__':
ssc = StreamingContext.getOrCreate(STREAM_CHECKPOINT, lambda: create_streaming())
ssc.start()
ssc.awaitTermination()

【问题讨论】:

  • 你在哪里设置 ssc.addPyFile?在 ssc.getOrCreate 中还是在 ssc.getOrCreate 之后?
  • 在返回流式上下文的方法中:
  • 尝试在 ssc = StreamingContext.getOrCreate 之后设置额外的 ssc.addPyFile
  • 只有 SparkContext 有方法 addPyFile,StreamingContext 没有。添加代码示例

标签: pyspark spark-streaming


【解决方案1】:

对不起,这是我的错。

试试这个:

if __name__ == '__main__':
    ssc = StreamingContext.getOrCreate('', None)
    ssc.sparkContext.addPyFile()

    ssc.start()
    ssc.awaitTermination()

【讨论】:

  • 非常感谢,这真的很有帮助!虽然这对我来说仍然是一个谜,为什么这在创建火花上下文时不起作用......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-02-09
  • 1970-01-01
  • 1970-01-01
  • 2018-08-26
  • 1970-01-01
  • 2016-02-16
相关资源
最近更新 更多