【问题标题】:Structured Streaming process multiple query?结构化流处理多个查询?
【发布时间】:2018-02-17 01:55:12
【问题描述】:

我使用 Spark Streaming 来处理在线需求,例如每小时的新用户数,如下所示:

每批,当log到来时,从hbase或dynamodb等外部表中选择uid,如果不存在则插入表

这种方法使用表的频率太高,花费太多。

现在我想使用结构化流来解决这个问题。

下面的sql可以离线解决问题:

sql1

create table event_min_table as select pageid,uid,floor(min(time)/36000)*3600 as event_time from event_table group by pageid,uid

sql2

select pageid,count(distinct uid) as cnt from event_min_table group by pageid,event_time

由于我对结构化流不熟悉,结构化流不支持多重聚合,所以我是这样使用的:

  1. readStream 创建查询为sql1 然后在内存中注册为表,输出模式为complete

  2. 从使用sql2的表创建查询,输出格式为update,保存到外部表,如hbase或dynamodb

我不知道我的方法能否解决问题,但我有几个问题:

  1. 如果我在complete输出模式下创建内存表,随着时间的推移数据会变大吗?

  2. 即使这样也可以,但是每次日志来的时候结果是否输出,所以问题还是没有解决,我的目标是减少对外部表的请求,例如hbase或dynamodb

【问题讨论】:

  • “但我有几个问题” 一次只有一个问题(根据 StackOverflow 规则)。我仍然不清楚你在问什么。

标签: apache-spark spark-structured-streaming


【解决方案1】:

1)如果我创建一个内存表作为完整输出模式,数据会随着时间的推移而变大吗?

我不这么认为(请参阅the code)。

我的目标是减少对外部表的请求

您可以使用KeyValueGroupedDataset.flatMapGroupsWithState 运算符完全控制状态存储中保留的内容、时间和时长:

flatMapGroupsWithState 将给定函数应用于每组数据,同时保持用户定义的每组状态。结果数据集将表示函数返回的对象。对于静态批处理数据集,该函数将每组调用一次。对于流式数据集,函数将在每个触发器中为每个组重复调用,并且每个组的状态更新将在调用之间保存。

这是您在结构化流中可以对过去和当前数据集进行的最大控制。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-23
    • 2018-12-03
    • 1970-01-01
    • 2019-08-24
    • 1970-01-01
    • 2019-12-13
    相关资源
    最近更新 更多