【问题标题】:Spark Streaming: How to load a Pipeline on a Stream?Spark Streaming:如何在流上加载管道?
【发布时间】:2016-09-29 17:33:09
【问题描述】:

我正在实现一个用于流处理的 lambda 架构系统。

我在 Spark Ba​​tch 中使用 GridSearch 创建管道没有问题:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])

paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()

cv = CrossValidator(estimator=pipeline,
                estimatorParamMaps=paramGrid,
                evaluator=BinaryClassificationEvaluator(),
                numFolds=4)

pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")

但是,我似乎无法找到如何将管道插入 Spark Streaming Process。我使用 kafka 作为 DStream 源,我的代码如下:

import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils

从 pyspark.streaming 导入 StreamingContext

ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc,  "localhost:2181", "spark-    streaming-consumer", {"kafka_topic": 1})

model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))

CODE MISSING GOES HERE    

ssc.start()
ssc.awaitTermination()

现在我需要找到一些方法

根据文档here(尽管它看起来非常过时),您的模型似乎需要实现predict 方法才能在 rdd 对象上使用它(希望在 kafkastream 上使用它?)

如何在 Streaming 上下文中使用管道?重新加载的 PipelineModel 似乎只实现了transform

这是否意味着在 Streaming 上下文中使用批处理模型的唯一方法是使用纯模型而不是管道?

【问题讨论】:

    标签: apache-spark pyspark spark-streaming apache-spark-mllib


    【解决方案1】:

    我找到了一种将 Spark Pipeline 加载到 Spark Streaming 中的方法。

    此解决方案适用于 Spark v2.0,因为后续版本可能会实现更好的解决方案。

    我找到的解决方案是使用toDF() 方法将流式RDD 转换为Dataframe,然后您可以在其中应用pipeline.transform 方法。

    不过,这种做事方式非常低效。

    # we load the required libraries
    from pyspark.sql.types import (
            StructType, StringType, StructField, LongType
            )
    from pyspark.sql import Row
    from pyspark.streaming.kafka import KafkaUtils
    
    #we specify the dataframes schema, so spark does not have to do reflections on the data.
    
    pipeline_schema = StructType(
        [
            StructField("field1",StringType(),True),
            StructField("field2",StringType(),True),
            StructField("field3", LongType(),True)
     ]
    )
    
    #We load the pipeline saved with spark batch
    pipeline = PipelineModel.load('/pipeline')
    
    #Setup usual spark context, and spark Streaming Context
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 1)
    
    #On my case I use kafka directKafkaStream as the DStream source
    directKafkaStream = KafkaUtils.createDirectStream(ssc, suwanpos[QUEUE_NAME], {"metadata.broker.list": "localhost:9092"})
    
    def handler(req_rdd):
        def process_point(p):
            #here goes the logic to  do after applying the pipeline
            print(p)   
        if req_rdd.count()  > 0:
            #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema)
            req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema)
            #Now we can apply the transform, yaaay
            pred = pipeline.transform(req_df)
            records = pred.rdd.map(lambda p: process_point(p)).collect()
    

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 2016-03-12
      • 2017-12-31
      • 2016-05-04
      • 1970-01-01
      • 2016-12-16
      • 1970-01-01
      • 2016-12-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多