【发布时间】:2018-05-11 22:15:43
【问题描述】:
我想将 sklearn.preprocessing 的标签编码器功能应用于使用 Kafka 和 Spark 结构化流的流数据。到目前为止的想法是:
当我每次从 Kafka 源接收一批数据时,我想在该批上实现如下功能:
def use_label_encoder(label_encoder, y):
return label_encoder.transform(y) + 1
to_transform_class_val = udf(use_label_encoder, IntegerType())
这是架构:
schema = StructType([
StructField("sepal_length_in_cm", StringType()), \
StructField("sepal_width_in_cm", StringType()), \
StructField("petal_length_in_cm", StringType()), \
StructField("petal_width_in_cm", StringType()), \
StructField("class", StringType())
])
df = df.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_json(df.value, schema).alias("json"))
当我尝试定义 label_encoder 时:
label_encoder = enc.fit(df1.select(to_upper("json.class")))
它给出错误“输入形状错误”
我用于非流式数据的等效代码是:
y = df['class'].values
enc = LabelEncoder()
label_encoder = enc.fit(y)
y = label_encoder.transform(y) + 1
谁能帮助我了解如何将 sklearn 方法应用于流数据?
【问题讨论】:
标签: machine-learning scikit-learn pyspark apache-kafka