【问题标题】:KStream-KTable LeftJoin, Join occured while KTable is not fully loadedKStream-KTable LeftJoin,在 KTable 未完全加载时发生 Join
【发布时间】:2020-08-11 13:45:54
【问题描述】:

我正在尝试使用 KStream-KTable leftJoin 来丰富主题 A 和主题 B 的项目。主题 A 是我的 KStream,主题 B 是我的 KTtable,它有大约 23M 条记录。两个主题的键都没有计算,所以我必须使用 reducer 将 KStream(topic B) 转换为 KTable。

这是我的代码:

KTable<String, String> ktable = streamsBuilder
     .stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
     .filter((key, value) -> {...})
     .transform(new KeyTransformer()) // generate new key
     .groupByKey()
     .reduce((aggValue, newValue) -> {...});

streamBuilder
     .stream("TopicA")
     .filter((key, value) -> {...})
     .transform(...)
     .leftJoin(ktable, new ValueJoiner({...}))
     .transform(...)
     .to("result")

1) KTable 初始化很慢。 (大约 2000 msg/s),这是正常的吗?我的主题只有 1 个分区。有什么方法可以提高性能? 我尝试设置以下内容以降低写入吞吐量,但似乎并没有太大改善。

CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000

2) 当从 Topic B 加载 KTable 时,连接发生。 这是连接发生时的偏移量(CURRENT-OFFSET/LOG-END-OFFSET)

   Topic A: 32725/32726 (Lag 1)
   Topic B: 1818686/23190390 (Lag 21371704)

我查看了Topic A失败记录的时间戳,是4天前的记录,Topic B最后一次处理的记录是6天前。 据我了解,kstream 进程记录基于时间戳,我不明白为什么在我的情况下,KStream(Topic A) 没有等到 KTable(Topic B) 完全加载到 4 天前触发加入。

我也尝试设置时间戳提取器返回 0,但效果不佳。

更新:将时间戳设置为 0 时,出现以下错误:

Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. 

我还尝试将 max.task.idle.ms 设置为 > 0(3 秒和 30 分钟),但仍然遇到相同的错误。

更新:我通过将 customTimestampsExtractor 设置为 6 天前修复了“UnknownProducerIdException”错误,这仍然早于主题 A 的记录。我认为(不确定)设置为 0 会触发导致此错误的更改日志上的保留。但是,在 ktable 完成加载之前,join 仍然无法正常工作。这是为什么呢?

我正在使用 Kafka Streams 2.3.0。

我在这里做错了吗?非常感谢。

【问题讨论】:

  • 已知旧时间戳的问题:issues.apache.org/jira/browse/KAFKA-6817
  • 嗨 Matthias,感谢 Jira 链接。关于我的 2) 问题,我在stackoverflow.com/questions/57498201/… 中阅读了您的回答,您说的是 2.1.0,时间戳是严格同步的。但是,在我的测试中,它的行为并非如此。即使 ktable topic 中的所有记录早于 kstream 中的所有记录,join 也无需等待 ktable 完成加载即可开始。这是一个已知问题吗?
  • 我不知道有任何问题——因为您已经使用max.task.idle.ms,我不确定问题可能是什么。但请注意,您的表不是直接从主题 B 加载的:当您更改密钥时,数据会在 reduce 之前重新分区(类似于流输入,即在 join() 之前发生重新分区) ),这也可能导致时间戳同步问题。重新分区将导致交错写入,从而导致数据乱序。
  • 你有没有想过这个问题?
  • 我将逻辑改为使用 GlobalKTable

标签: apache-kafka apache-kafka-streams


【解决方案1】:

1.KTable初始化慢。 (大约 2000 msg/s),这正常吗?

这取决于您的网络,我认为限制是TopicB的消耗率,您使用的两个配置CACHE_MAX_BYTES_BUFFERING_CONFIGCOMMIT_INTERVAL_MS_CONFIG是在您想要产生多少KTable输出之间进行权衡(因为 KTable 更改日志是修订流)以及当您将 KTable 更新到底层主题和下游处理器时您接受多少延迟。详细查看Kafka Streams caching config for state storethis blog part Tables, Not Triggers

我认为提高TopicB消费率的好方法是增加分区。

  1. KStream.leftJoin(KTable,...) 始终是查表,总是将当前流记录与KTable 上最新更新的记录连接起来,在决定是否加入时不会考虑流时间。如果您想在加入时考虑直播时间,请查看KStream-KStream join

在您的情况下,此滞后是 TopicB 的滞后,这并不意味着 KTable 未完全加载。当您的 KTable 处于状态恢复过程中时,当它从 KTable 的底层更改日志主题中读取以在实际运行您的流应用程序之前恢复当前状态时,您的 KTable 未完全加载,以防万一您将无法进行连接,因为流应用程序不是运行直到状态完全恢复。

【讨论】:

  • 感谢您的回复。对于2),我想加入主题B的当前状态,这是我期望kafka会做的,但是现在,当状态没有从主题B完全恢复时,加入正在发生。所以查找失败。你怎么能让kstream等到ktable完成加载所有内容然后我的kstream开始?我尝试了stackoverflow.com/questions/56556270/… 中描述的方法。但对我不起作用。
  • 是的,上面问题的问题与同步问题有关,你的问题是不同的。我认为您可以从拓扑中临时删除 streamBuilder.stream("TopicA")... ,只需保留构建 KTable 的代码,这样它就可以先赶上 TopicB 的滞后,之后您重新添加上述拓扑即可正常运行后加入
  • 哦,如果您还将max.task.idle.ms 设置为大于零(例如 3 秒),那么 Matthias 提出的将时间戳更改为 0 的建议似乎也有效,因此 StreamThread 可以缓冲记录以在实际之前等待 KTable 追赶处理来自 TopicA 的消息
  • 我更新了上面的描述,添加了我将时间戳设置为 0 后得到的错误。谢谢
  • 我不确定这是否与这个问题有关,主题A和主题B中的数据是使用Kafka 2.0.0生成的,后来我升级到了2.4。这有关系吗?
猜你喜欢
  • 2020-10-17
  • 2018-02-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多