【发布时间】:2020-01-11 22:26:29
【问题描述】:
我使用 Spark 2.4.3 和 Kafka 2.3.0。我想使用从 Kafka 到 Spark 的数据进行 Spark 结构化流式传输。一般来说,它确实在测试模式下工作,但由于我必须对数据进行一些处理(并且不知道另一种方法),Spark 数据帧不再具有流式传输功能。
#!/usr/bin/env python3
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
# create schema for data
schema = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])
# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()
# create DataFrame representing the stream
dsraw = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test")
print("dsraw.isStreaming: ", dsraw.isStreaming)
# Convert Kafka stream to something readable
ds = dsraw.selectExpr("CAST(value AS STRING)")
print("ds.isStreaming: ", ds.isStreaming)
# Do query on the converted data
dsQuery = ds.writeStream.queryName("ds_query").format("memory").start()
df1 = spark.sql("select * from ds_query")
print("df1.isStreaming: ", df1.isStreaming)
# convert json into spark dataframe cols
df2 = df1.withColumn("value", from_json("value", schema))
print("df2.isStreaming: ", df2.isStreaming)
输出是:
dsraw.isStreaming: True
ds.isStreaming: True
df1.isStreaming: False
df2.isStreaming: False
因此,当我创建第一个数据帧时,我失去了流媒体功能。我怎样才能避免它?如何从流中获取流式 Spark 数据帧?
【问题讨论】:
-
我不明白你的问题。
spark.sql()不创建流。你能解释一下你想做什么吗? -
在后面的步骤中,我想对数据帧进行一些处理。我猜这些数据帧应该是流式的,我理解为未绑定数据帧的同义词。还是我错了?
-
您需要将 DF 创建逻辑放在 sink 'writestream' 之前,使其成为流式 DF。如果只想对流数据应用一些业务逻辑,也可以使用 writeStream.foreachBatch。
-
我找不到在 readstream 和 before writestream 之后执行 DF 的示例。我还想知道创建一个我可以使用 sql 访问的 writestream,然后执行 DF 的东西。
标签: apache-spark pyspark apache-kafka apache-spark-sql spark-streaming