【发布时间】:2018-01-18 23:38:22
【问题描述】:
我正在尝试使用两个不同的窗口聚合流并将其打印到控制台中。但是,仅打印第一个流式查询。 tenSecsQ 不会打印到控制台中。
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCountWindowed")
.config("spark.master", "local[*]")
.getOrCreate();
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
.load();
Dataset<Row> words = lines
.as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
.toDF("word", "timestamp");
// 5 second window
Dataset<Row> fiveSecs = words
.groupBy(
functions.window(words.col("timestamp"), "5 seconds"),
words.col("word")
).count().orderBy("window");
// 10 second window
Dataset<Row> tenSecs = words
.groupBy(
functions.window(words.col("timestamp"), "10 seconds"),
words.col("word")
).count().orderBy("window");
触发 5s 和 10s 聚合流的流式查询。不打印 10 秒流的输出。只有 5s 打印到控制台
// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
.queryName("5_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
.queryName("10_secs")
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start();
tenSecsQ.awaitTermination();
【问题讨论】:
-
实际上,我不知道套接字流是如何工作的,但对我来说,您的第一个 Spark 流似乎从套接字流中读取了所有数据,而第二个则什么也没有。
-
尝试设置检查点位置。可能,他们使用相同的检查点位置,结果第一个查询会读取所有消息。
标签: apache-spark spark-structured-streaming