【问题标题】:Identically Keyed Topics Failing to Join in Kafka Streams相同键的主题未能加入 Kafka 流
【发布时间】:2020-02-22 17:57:10
【问题描述】:

我最近在流应用程序中遇到了一个我以前没有遇到过的问题,并且很难追踪与键控/连接(以及更新后的分区)相关的问题。

我有两个主题(raw_events 和 processes_users),它们的键控相同,但是当我尝试对这两个主题执行连接时,只有 一些 连接成功,尽管键控相同。

工作流程(用于上下文)

为简洁起见,应用程序的基本工作流程如下:

  1. 数据通过生产者流入raw_event 主题。
  2. 一系列流应用程序侦听raw_event 主题并根据一系列业务规则(例如IP 地址、用户等)从中提取各种实体
  3. raw_event 主题中识别的实体被放入preprocessing_{type} 主题中,其中包含有关提取的元数据和在raw_event 中找到的相关信息(例如,对于用户而言,这可能是帐户名称、电子邮件等)。 这些主题中的项目由raw_event 键入。
  4. 另一系列流应用程序将侦听各种preprocessing_{type} 主题,并与一系列GlobalKTables 相结合,这些GlobalKTables 代表该给定实体final_{type} 的所有已知实例。对于成功的连接,来自final_{type} 的实例将使用来自raw_event/preprocessing_{type} 主题的新信息进行丰富;不成功的连接将指示给定类型的新实体,然后将其键入并放入final_{type} 主题。 preprocessing_{type} 的所有丰富实例都插入到 processing_{type} 主题中,该主题包含实体的丰富(或新)实例以及创建它的元数据。最重要的是 - processed_{type} 主题中的项目仍由 raw_event 键入。
  5. 最后,一个流应用程序运行并尝试通过加入 processing_{type} 来丰富来自 raw_event 的原始实例,这将是相同的键,并使用来自丰富实体的各​​种信息丰富 raw_event 实例,在将其推送到 final_event 主题之前。

问题

问题本身出现在上面的第 5 步(事件丰富)中,因为只有 raw_event 主题和 processing_users 主题之间的部分连接按预期工作。

使用通过整个管道的 24 条记录的子集,主题中的 24 对中只有 5 对成功加入。有效的那些似乎是相同的一致的,但我在数据中看不到任何表明为什么一个有效而另一个无效:

raw_event keys          processing_user keys
mawjuG0B9k3AiALz0_2S    0q0juG0B9k3AiALz8ApP 
xEEcv20B9k3AiALzEN0m    m60juG0B9k3AiALz5gU5 
zqwjuG0B9k3AiALzz_tg    ua0juG0B9k3AiALz7wqa 
v60juG0B9k3AiALz6Aal    xEEcv20B9k3AiALzEN0m 
0q0juG0B9k3AiALz8ApP    zqwjuG0B9k3AiALzz_tg 
RK0juG0B9k3AiALz5QUw    zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal    Ta0juG0B9k3AiALz5QUw 
8KwjuG0B9k3AiALz1v58    RKwjuG0B9k3AiALz1P7C 
c60juG0B9k3AiALz5gU4    -60juG0B9k3AiALz3gGn 
RKwjuG0B9k3AiALz1P7C    Va0juG0B9k3AiALz5QUw 
zK0juG0B9k3AiALz6Aal    560juG0B9k3AiALz3QGh 
Ta0juG0B9k3AiALz5QUw    mawjuG0B9k3AiALz0_2S 
Va0juG0B9k3AiALz5QUw    -K0juG0B9k3AiALz3QGh 
pK0juG0B9k3AiALz5gU5    zq0juG0B9k3AiALz6Aal 
Xa0juG0B9k3AiALz2QCh    RK0juG0B9k3AiALz5QUw 
560juG0B9k3AiALz3QGh    v60juG0B9k3AiALz6Aal 
-K0juG0B9k3AiALz3QGh    Xa0juG0B9k3AiALz2QCh 
-60juG0B9k3AiALz3gGn    P60juG0B9k3AiALz5QUw 
F60juG0B9k3AiALz3gKn    pK0juG0B9k3AiALz5gU5 
m60juG0B9k3AiALz5gU5    0a0juG0B9k3AiALz6Aal 
zq0juG0B9k3AiALz6Aal    3K0juG0B9k3AiALz3QGh 
ua0juG0B9k3AiALz7wqa    8KwjuG0B9k3AiALz1v58 
3K0juG0B9k3AiALz3QGh    F60juG0B9k3AiALz3gKn 
P60juG0B9k3AiALz5QUw    c60juG0B9k3AiALz5gU4 

我尝试了将主题作为 KStreams 和 KTables 加入的组合(以及我能想到的每一种组合),但是在这个小子集中的 24 条消息中,只有大约 5 条连接是成功的。

当前代码的当前示例(和略微简化):

val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)

val finalEvents = events
    .join(users, eventsProcessor::enrichWithUsers)
    .to("final_events")

鉴于 raw_eventsprocessing_users 主题中有对应的对 (1:1),是否有任何解释为什么某些连接会成功而其他连接会失败?只有 5 对将始终进入final_events 主题(总是相同的对)。

欢迎提供任何其他建议!

配置

为了详细起见,这里有几点值得注意的设置:

  • 使用 Kafka Streams 2.3.0
  • 为所有适用的物化调用分别启用/禁用缓存和日志记录
  • 拓扑优化已启用
  • 缓存缓冲设置为 0

更新

在花了几个小时仔细研究并深入研究数据之后,简而言之,这个问题似乎与分区有关。

五个连接一直成功,只是因为键位于每个主题的相同分区上:

successful events       raw_events partition  processing_users partition
RK0juG0B9k3AiALz5QUw    3                     3
m60juG0B9k3AiALz5gU5    7                     7
ua0juG0B9k3AiALz7wqa    7                     7
8KwjuG0B9k3AiALz1v58    8                     8
RKwjuG0B9k3AiALz1P7C    9                     9

尽管两个主题中都存在所有键,但它们似乎没有使用相同的策略进行分区(即两个主题包含所有具有相同键的消息,但有些可能会出现在raw_events 的一个分区上,但在processing_users 的另一个分区上),如下面的分区/计数表示所示:

值得强调的是,raw_events 主题中出现的消息是在上述流应用程序工作流程之外产生的,这让我相信这些问题需要得到回答:

  • 假设分区策略导致跨分区的标准化分布,是否可以让分区策略的责任仅落在流工作流的入口点上? (例如,如果给定的密钥在 raw_events 的分区 7 中,并且您将具有相同密钥的记录发送到 preprocessing_users,它会落入分区 7?
  • 如果是这样,这是一个合理的策略吗?或者有没有办法在不编写所有生产者和流应用程序使用的自定义分区器的情况下强制执行此行为?
  • 如果没有,是否可以采用现有主题(在本例中为 raw_event 并基本上重新分区整个主题以便使用默认分区策略?

【问题讨论】:

    标签: kotlin apache-kafka apache-kafka-streams


    【解决方案1】:

    正如对原始帖子的更新所详述,问题本身是 .NET Producer 应用程序之间分区策略差异的结果,该应用程序默认使用 consistent_random 分区策略,而不是默认的 Java 流应用程序其中使用murmur2random 策略。

    有几个选项可以解决这个问题,但在这种特殊情况下,最简单的方法是调整生产者以使用适当的策略:

    // Set the default partitioning strategy 
    ProducerConfig.Partitioner = Partitioner.Murmur2Random;
    

    另一种方法可能是编写一个 CustomPartitioner 类,该类将实施您喜欢的分区策略来模仿您的生产者。

    【讨论】:

      猜你喜欢
      • 2019-10-11
      • 1970-01-01
      • 2018-12-10
      • 2020-05-01
      • 2019-06-01
      • 2018-02-06
      • 1970-01-01
      • 1970-01-01
      • 2019-07-27
      相关资源
      最近更新 更多