【问题标题】:Using Spark SQL with Spark Streaming将 Spark SQL 与 Spark 流结合使用
【发布时间】:2019-03-03 11:39:05
【问题描述】:

尝试在 Spark 结构化流方面理解 SparkSql。
Spark Session 从 kafka 主题中读取事件,将数据聚合到按不同列名分组的计数并将其打印到控制台。
原始输入数据结构如下:

+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
|.  sourceTypes|                Guid|  platform|datacenter|pagesId|     eventTimestamp|              Id1234|  Id567890|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+----------+
| Notififcation|....................|   ANDROID|       dev|     aa|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|   ANDROID|       dev|     ab|2018-09-27 09:41:29|fce81f05-a085-392...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:46|0ee089c1-d5da-3b3...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|57c18964-40c9-311...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:48|5ecf1d77-321a-379...|{"id":...|
| Notififcation|....................|     WEBOS|       dev|     aa|2018-09-27 09:42:52|d9fc4cfa-0934-3e9...|{"id":...|
+--------------+--------------------+----------+----------+-------+-------------------+--------------------+---------+

sourceTypesplatformdatacenterpageId 需要计数。

使用以下代码聚合数据:

Dataset<Row> query = sourceDataset
        .withWatermark("eventTimestamp", watermarkInterval)
        .select(
            col("eventTimestamp"),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .groupBy(
            window(col("eventTimestamp"), windowInterval),
            col("datacenter"),
            col("platform"),
            col("pageId")
        )
        .agg(
            max(col("eventTimestamp"))
        );

这里是watermarkInterval=45secondswindowInterval=15secondstriggerInterval=15seconds

使用新的聚合数据集:

aggregatedDataset
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("console")
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .start();

有几个问题:

  1. 输出数据未打印每个 groupBy 的计数,例如平台、pageId 等。

  2. 如何以 json 格式打印输出?我尝试在控制台输出数据时使用select(to_json(struct("*")).as("value")),但它不起作用。

【问题讨论】:

    标签: java apache-spark apache-spark-sql spark-streaming spark-structured-streaming


    【解决方案1】:

    您可以使用以下代码 sn-p 解决您的问题:

    .outputMode("complete")
    

    【讨论】:

      猜你喜欢
      • 2018-01-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-09
      • 1970-01-01
      • 2019-10-30
      • 2018-05-27
      相关资源
      最近更新 更多