【发布时间】:2016-09-29 17:33:09
【问题描述】:
我正在实现一个用于流处理的 lambda 架构系统。
我在 Spark Batch 中使用 GridSearch 创建管道没有问题:
pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])
paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=4)
pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")
但是,我似乎无法找到如何将管道插入 Spark Streaming Process。我使用 kafka 作为 DStream 源,我的代码如下:
import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils
从 pyspark.streaming 导入 StreamingContext
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1})
model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))
CODE MISSING GOES HERE
ssc.start()
ssc.awaitTermination()
现在我需要找到一些方法
根据文档here(尽管它看起来非常过时),您的模型似乎需要实现predict 方法才能在 rdd 对象上使用它(希望在 kafkastream 上使用它?)
如何在 Streaming 上下文中使用管道?重新加载的 PipelineModel 似乎只实现了transform
这是否意味着在 Streaming 上下文中使用批处理模型的唯一方法是使用纯模型而不是管道?
【问题讨论】:
标签: apache-spark pyspark spark-streaming apache-spark-mllib