【问题标题】:Kafka streams handle batch data to reset aggregationKafka 流处理批处理数据以重置聚合
【发布时间】:2020-09-10 11:23:02
【问题描述】:

我有一些数据到达我的 kafka 主题“数据源”,其架构如下(此处为演示而简化):

{ "deal" : -1, "location": "", "value": -1, "type": "init" }
{ "deal": 123456, "location": "Mars", "value": 100.0, "type": "batch" },
{ "deal" 123457, "location": "Earth", "value", 200.0, "type": "batch" },
{ "deal": -1, "location": "", "value", -1, "type": "commit" }

此数据来自批量运行,我们获取所有交易并重新计算其价值。把它想象成一个开始的过程——此时,这里有一组所有位置的新数据。 此时init和commit消息没有发送到真正的topic,它们被producer过滤掉了。

白天,随着事情的变化,会有更新。这提供了新数据(在此示例中,我们可以忽略覆盖数据,因为这将通过重新运行批处理来处理):

{ "deal": 123458, "location", "Mars", "value": 150.0, "type": "update" }

此数据作为 KStream“位置”进入应用程序。


另一个主题“位置”有一个可能位置的列表。这些作为 KGlobalTable 位置被拉入 java kafka-streams 应用程序:

{ "id": 1, "name": "Mars" },
{ "id": 2, "name": "Earth"}

计划是使用 java 9 kafka-streams 应用程序来聚合这些值,按位置分组。输出应该类似于:

{ "id": 1, "location": "Earth", "sum": 250.0 },
{ "id": 2, "location": "Mars": "sum": 200.0 }

这就是我迄今为止所做的工作:

StreamsBuilder builder = new StreamsBuilder();

/** snip creating serdes, settings up stores, boilerplate  **/

final GlobalKTable<Integer, Location> locations = builder.globalTable(
                LOCATIONS_TOPIC, 
                /* serdes, materialized, etc */
                );

final KStream<Integer, PositionValue> positions = builder.stream(
                POSITIONS_TOPIC,
                /* serdes, materialized, etc */
            );

/* The real thing is more than just a name, so a transformer is used to match locations to position values, and filter ones that we don't care about */
KStream<Location, PositionValue> joined = positions
                .transform(() -> new LocationTransformer(), POSITION_STORE) 
                .peek((location, positionValue) -> { 
                    LOG.debugv("Processed position {0} against location {1}", positionValue, location);
                });

/** This is where it is grouped and aggregated here **/
joined.groupByKey(Grouped.with(locationSerde, positionValueSerde))
            .aggregate(Aggregation::new, /* initializer */
                       (location, positionValue, aggregation) -> aggregation.updateFrom(location, positionValue), /* adder */
                Materialized.<Location, Aggregation>as(aggrStoreSupplier)
                    .withKeySerde(locationSerde)
                    .withValueSerde(aggregationSerde)
            );

Topology topo = builder.build();

我遇到的问题是,这是汇总所有内容 - 所以每日批次,加上更新,然后是下一个每日批次,都被添加。基本上,我需要一种方式来表示“这是下一组批处理数据,对此进行重置”。我不知道该怎么做 - 请帮忙!

谢谢

【问题讨论】:

    标签: java apache-kafka apache-kafka-streams


    【解决方案1】:

    因此,如果我理解正确,您希望汇总数据,但仅针对最后一天,并丢弃其余数据。

    我建议您聚合成一个中间类,该类包含流中的所有值,并且还具有过滤掉其他日子数据的逻辑。如果我理解正确,那将丢弃最后一个“批处理”类型之前的所有数据。

    虽然在 Kotlin 中,我已经做了一个 similar solution,如果需要,可以查看。

    【讨论】:

    • 您确实理解正确。谢谢,我会看看这个,看看它是否符合我的需要。我仍然习惯于流和拓扑?
    • 我的示例中的代码将计算主题上有多少 STARTED 类型的 ProcessingEvents,但它会丢弃任何 FINISHED 类型的 ProcessingEvents。它将数据收集到一个名为 ProcessingEventDto 的中间类中,该类具有确定 ProcessingEvent 是否完成的逻辑,并负责提供它被启动的次数
    • 谢谢 - 我最终标记了一个位置以指示何时需要重置,并在聚合器中使用它:``` if (measurement.dealNum == PositionValue.RESET_FLAG) return null;否则返回 aggregate.updateFrom(limit, measure); ```
    【解决方案2】:

    您可以做一些事情,但我建议您使用 TimeWindowed Stream。您可以将时间设置为 1 天的滚动窗口,并在该流上执行代理。您最终将在 KTable 中自己的窗口中汇总每一天。这样您就不必担心丢弃数据(尽管您可以),并且每天都会分开。

    这里有几个很好的例子来说明它们是如何工作的:https://www.programcreek.com/java-api-examples/?api=org.apache.kafka.streams.kstream.TimeWindows

    【讨论】:

    • 我曾考虑过这一点,但一天并不总是在同一时间点。所以今天它可以在凌晨 2 点、明天 2.05 点等重置。我认为重叠可能会导致问题。有没有程序化窗口之类的东西?
    • 还有会话窗口,它们根据两个记录之间的时间间隔将记录分组。如果数据是成批的,这可能在 2.7 中对你有用,还会有滑动窗口,在每个窗口中保持相同数量的记录。虽然我不太认为这也能满足您的需求
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-02-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多