【问题标题】:Executing separate streaming queries in spark structured streaming在 Spark 结构化流中执行单独的流查询
【发布时间】:2018-01-18 23:38:22
【问题描述】:

我正在尝试使用两个不同的窗口聚合流并将其打印到控制台中。但是,仅打印第一个流式查询。 tenSecsQ 不会打印到控制台中。

SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .config("spark.master", "local[*]")
    .getOrCreate();

Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
    .groupBy(
         functions.window(words.col("timestamp"), "5 seconds"),
         words.col("word")
    ).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
    .groupBy(
          functions.window(words.col("timestamp"), "10 seconds"),
          words.col("word")
    ).count().orderBy("window");

触发 5s 和 10s 聚合流的流式查询。不打印 10 秒流的输出。只有 5s 打印到控制台

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
    .queryName("5_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

// Start writeStream() for 10s window
StreamingQuery tenSecsQ = tenSecs.writeStream()
    .queryName("10_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

tenSecsQ.awaitTermination();

【问题讨论】:

  • 实际上,我不知道套接字流是如何工作的,但对我来说,您的第一个 Spark 流似乎从套接字流中读取了所有数据,而第二个则什么也没有。
  • 尝试设置检查点位置。可能,他们使用相同的检查点位置,结果第一个查询会读取所有消息。

标签: apache-spark spark-structured-streaming


【解决方案1】:

我一直在调查这个问题。

总结:Structured Streaming 中的每个查询都会使用 source 数据。套接字源为定义的每个查询创建一个新连接。在这种情况下看到的行为是因为nc 仅将输入数据传送到第一个连接。

从今以后,除非我们能够确保连接的套接字源向每个打开的连接传递相同的数据,否则不可能在套接字连接上定义多个聚合。


我在 Spark 邮件列表中讨论了这个问题。 Databricks 开发者朱世雄回答:

Spark 为每个查询创建一个连接。您观察到的行为是因为“nc -lk”的工作原理。如果您使用netstat 来检查 tcp 连接,您将在启动两个查询时看到有两个连接。但是,“nc”只将输入转发到一个连接。

我通过定义一个小实验来验证这种行为: 首先,我创建了一个SimpleTCPWordServer,它为每个打开的连接提供随机单词,以及一个声明两个查询的基本结构化流作业。它们之间的唯一区别是第二个查询定义了一个额外的常量列来区分其输出:

val lines = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", "9999")
    .option("includeTimestamp", true)
    .load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("7 seconds"))
  .start()

如果 StructuredStreaming 只消耗一个流,那么我们应该会看到两个查询传递相同的单词。如果每个查询都消耗一个单独的流,那么每个查询都会报告不同的单词。

这是观察到的输出:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
|   value|          timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+

+------+-------------------+---+
| value|          timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
|  value|          timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
|   bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|     value|          timestamp|
+----------+-------------------+
|    breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
|   amazing|2017-08-14 13:54:52|
|    bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
|     asset|2017-08-14 13:54:54|
|      cell|2017-08-14 13:54:54|
+----------+-------------------+

我们可以清楚地看到每个查询的流是不同的。看起来不可能对socket source 传递的数据定义多个聚合,除非我们可以保证 TCP 后端服务器向每个打开的连接传递完全相同的数据。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-06
    • 2018-08-11
    • 1970-01-01
    • 2017-05-04
    • 2018-06-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多