【问题标题】:Try to use sklearn library in Spark Structured Streaming尝试在 Spark Structured Streaming 中使用 sklearn 库
【发布时间】:2018-05-11 22:15:43
【问题描述】:

我想将 sklearn.preprocessing 的标签编码器功能应用于使用 Kafka 和 Spark 结构化流的流数据。到目前为止的想法是:

当我每次从 Kafka 源接收一批数据时,我想在该批上实现如下功能:

def use_label_encoder(label_encoder, y):
   return label_encoder.transform(y) + 1

to_transform_class_val = udf(use_label_encoder, IntegerType())

这是架构:

schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])

df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))

当我尝试定义 label_encoder 时:

label_encoder = enc.fit(df1.select(to_upper("json.class")))

它给出错误“输入形状错误”

我用于非流式数据的等效代码是:

y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1

谁能帮助我了解如何将 sklearn 方法应用于流数据?

【问题讨论】:

    标签: machine-learning scikit-learn pyspark apache-kafka


    【解决方案1】:

    以后可以加1吗?你的火花代码会变成

    def use_label_encoder(label_encoder, y):
        return label_encoder.transform(y)
    
    to_transform_class_val = udf(use_label_encoder, IntegerType())
    
    df = df.withColumn('new_col', to_transform_class_val(label_encoder, 'old_column'))
    df = df.withColumn('label_enc', col('new_col') + lit(1))
    

    【讨论】:

    • use_label_encoder 方法有 2 个参数,但调用该方法时只传递一个参数
    • 错过了。在答案中纠正。
    • 但我无法定义标签编码器本身。从问题中提取:当我尝试定义 label_encoder: label_encoder = enc.fit(df1.select(to_upper("json.class"))) 它给出一个错误“bad input shape”
    猜你喜欢
    • 2020-03-19
    • 2020-09-12
    • 1970-01-01
    • 2020-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-29
    • 2022-01-05
    相关资源
    最近更新 更多