【发布时间】:2020-06-16 14:23:12
【问题描述】:
当没有水印的流数据帧/数据集上存在流聚合时,不支持获取错误输出模式。我想在控制台上输出。
class StructSpark:
def __init__(self, address, port):
self.address = address
self.port = port
self.spark = SparkSession.builder.appName("StructuredWordcount").getOrCreate()
def getonline(self):
lines = self.spark.readStream.format('socket').option('host', self.address).option('port', self.port).option(
'includeTimestamp', 'true').load()
words = lines.select(split(lines.value, ',').alias("value"), lines.timestamp)
words1 = words.select((split(words.value[0], ',')).alias("key"),(split(words.value[0], ',')).alias("value"), lines.timestamp)
windowedCount = words1.withWatermark("timestamp", "10 minutes").groupBy(window(words1.timestamp, "5 minutes", "5 minutes"),words1.key).count()
windowedCount.createOrReplaceTempView("updates")
count = self.spark.sql("select * from updates where count > 1")
with open('/home/vaibhav/Desktop/data.txt', 'a') as file:
file.write(str(count))
query = count.writeStream.outputMode("Append").format("console").start()
query.awaitTermination()
【问题讨论】:
-
请正确格式化代码并添加一些文字说明它的问题以及您想要实现的目标。
标签: python apache-spark pyspark spark-streaming