【问题标题】:how to write.stream in append mode如何在附加模式下 write.stream
【发布时间】:2020-06-16 14:23:12
【问题描述】:

当没有水印的流数据帧/数据集上存在流聚合时,不支持获取错误输出模式。我想在控制台上输出。

class StructSpark:
  def __init__(self, address, port):
    self.address = address
    self.port = port
    self.spark = SparkSession.builder.appName("StructuredWordcount").getOrCreate()
def getonline(self):
    lines = self.spark.readStream.format('socket').option('host', self.address).option('port', self.port).option(
        'includeTimestamp', 'true').load()
    words = lines.select(split(lines.value, ',').alias("value"), lines.timestamp)
    words1 = words.select((split(words.value[0], ',')).alias("key"),(split(words.value[0], ',')).alias("value"), lines.timestamp)
    windowedCount = words1.withWatermark("timestamp", "10 minutes").groupBy(window(words1.timestamp, "5 minutes", "5 minutes"),words1.key).count()
    windowedCount.createOrReplaceTempView("updates")
    count = self.spark.sql("select * from updates where count > 1")
    with open('/home/vaibhav/Desktop/data.txt', 'a') as file:
        file.write(str(count))
    query = count.writeStream.outputMode("Append").format("console").start()
    query.awaitTermination()

【问题讨论】:

  • 请正确格式化代码并添加一些文字说明它的问题以及您想要实现的目标。

标签: python apache-spark pyspark spark-streaming


【解决方案1】:

由于您在 dstream 中执行聚合操作,因此您无法在附加模式下执行 write.stream。要么在“完成”模式下使用它,要么在聚合操作之前执行 write.stream。

【讨论】:

    猜你喜欢
    • 2018-09-06
    • 1970-01-01
    • 1970-01-01
    • 2018-07-01
    • 2013-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多