【发布时间】:2020-02-22 17:57:10
【问题描述】:
我最近在流应用程序中遇到了一个我以前没有遇到过的问题,并且很难追踪与键控/连接(以及更新后的分区)相关的问题。
我有两个主题(raw_events 和 processes_users),它们的键控相同,但是当我尝试对这两个主题执行连接时,只有 一些 连接成功,尽管键控相同。
工作流程(用于上下文)
为简洁起见,应用程序的基本工作流程如下:
- 数据通过生产者流入
raw_event主题。 - 一系列流应用程序侦听
raw_event主题并根据一系列业务规则(例如IP 地址、用户等)从中提取各种实体 - 从
raw_event主题中识别的实体被放入preprocessing_{type}主题中,其中包含有关提取的元数据和在raw_event中找到的相关信息(例如,对于用户而言,这可能是帐户名称、电子邮件等)。 这些主题中的项目由raw_event键入。 - 另一系列流应用程序将侦听各种
preprocessing_{type}主题,并与一系列GlobalKTables 相结合,这些GlobalKTables 代表该给定实体final_{type}的所有已知实例。对于成功的连接,来自final_{type}的实例将使用来自raw_event/preprocessing_{type}主题的新信息进行丰富;不成功的连接将指示给定类型的新实体,然后将其键入并放入final_{type}主题。preprocessing_{type}的所有丰富实例都插入到processing_{type}主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是 -processed_{type}主题中的项目仍由raw_event键入。 - 最后,一个流应用程序运行并尝试通过加入
processing_{type}来丰富来自raw_event的原始实例,这将是相同的键,并使用来自丰富实体的各种信息丰富raw_event实例,在将其推送到final_event主题之前。
问题
问题本身出现在上面的第 5 步(事件丰富)中,因为只有 raw_event 主题和 processing_users 主题之间的部分连接按预期工作。
使用通过整个管道的 24 条记录的子集,主题中的 24 对中只有 5 对成功加入。有效的那些似乎是相同的一致的,但我在数据中看不到任何表明为什么一个有效而另一个无效:
raw_event keys processing_user keys
mawjuG0B9k3AiALz0_2S 0q0juG0B9k3AiALz8ApP
xEEcv20B9k3AiALzEN0m m60juG0B9k3AiALz5gU5
zqwjuG0B9k3AiALzz_tg ua0juG0B9k3AiALz7wqa
v60juG0B9k3AiALz6Aal xEEcv20B9k3AiALzEN0m
0q0juG0B9k3AiALz8ApP zqwjuG0B9k3AiALzz_tg
RK0juG0B9k3AiALz5QUw zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal Ta0juG0B9k3AiALz5QUw
8KwjuG0B9k3AiALz1v58 RKwjuG0B9k3AiALz1P7C
c60juG0B9k3AiALz5gU4 -60juG0B9k3AiALz3gGn
RKwjuG0B9k3AiALz1P7C Va0juG0B9k3AiALz5QUw
zK0juG0B9k3AiALz6Aal 560juG0B9k3AiALz3QGh
Ta0juG0B9k3AiALz5QUw mawjuG0B9k3AiALz0_2S
Va0juG0B9k3AiALz5QUw -K0juG0B9k3AiALz3QGh
pK0juG0B9k3AiALz5gU5 zq0juG0B9k3AiALz6Aal
Xa0juG0B9k3AiALz2QCh RK0juG0B9k3AiALz5QUw
560juG0B9k3AiALz3QGh v60juG0B9k3AiALz6Aal
-K0juG0B9k3AiALz3QGh Xa0juG0B9k3AiALz2QCh
-60juG0B9k3AiALz3gGn P60juG0B9k3AiALz5QUw
F60juG0B9k3AiALz3gKn pK0juG0B9k3AiALz5gU5
m60juG0B9k3AiALz5gU5 0a0juG0B9k3AiALz6Aal
zq0juG0B9k3AiALz6Aal 3K0juG0B9k3AiALz3QGh
ua0juG0B9k3AiALz7wqa 8KwjuG0B9k3AiALz1v58
3K0juG0B9k3AiALz3QGh F60juG0B9k3AiALz3gKn
P60juG0B9k3AiALz5QUw c60juG0B9k3AiALz5gU4
我尝试了将主题作为 KStreams 和 KTables 加入的组合(以及我能想到的每一种组合),但是在这个小子集中的 24 条消息中,只有大约 5 条连接是成功的。
当前代码的当前示例(和略微简化):
val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)
val finalEvents = events
.join(users, eventsProcessor::enrichWithUsers)
.to("final_events")
鉴于 raw_events 和 processing_users 主题中有对应的对 (1:1),是否有任何解释为什么某些连接会成功而其他连接会失败?只有 5 对将始终进入final_events 主题(总是相同的对)。
欢迎提供任何其他建议!
配置
为了详细起见,这里有几点值得注意的设置:
- 使用 Kafka Streams 2.3.0
- 为所有适用的物化调用分别启用/禁用缓存和日志记录
- 拓扑优化已启用
- 缓存缓冲设置为 0
更新
在花了几个小时仔细研究并深入研究数据之后,简而言之,这个问题似乎与分区有关。
五个连接一直成功,只是因为键位于每个主题的相同分区上:
successful events raw_events partition processing_users partition
RK0juG0B9k3AiALz5QUw 3 3
m60juG0B9k3AiALz5gU5 7 7
ua0juG0B9k3AiALz7wqa 7 7
8KwjuG0B9k3AiALz1v58 8 8
RKwjuG0B9k3AiALz1P7C 9 9
尽管两个主题中都存在所有键,但它们似乎没有使用相同的策略进行分区(即两个主题包含所有具有相同键的消息,但有些可能会出现在raw_events 的一个分区上,但在processing_users 的另一个分区上),如下面的分区/计数表示所示:
值得强调的是,raw_events 主题中出现的消息是在上述流应用程序工作流程之外产生的,这让我相信这些问题需要得到回答:
- 假设分区策略导致跨分区的标准化分布,是否可以让分区策略的责任仅落在流工作流的入口点上? (例如,如果给定的密钥在
raw_events的分区 7 中,并且您将具有相同密钥的记录发送到preprocessing_users,它会落入分区 7? - 如果是这样,这是一个合理的策略吗?或者有没有办法在不编写所有生产者和流应用程序使用的自定义分区器的情况下强制执行此行为?
- 如果没有,是否可以采用现有主题(在本例中为
raw_event并基本上重新分区整个主题以便使用默认分区策略?
【问题讨论】:
标签: kotlin apache-kafka apache-kafka-streams