【发布时间】:2020-08-11 13:45:54
【问题描述】:
我正在尝试使用 KStream-KTable leftJoin 来丰富主题 A 和主题 B 的项目。主题 A 是我的 KStream,主题 B 是我的 KTtable,它有大约 23M 条记录。两个主题的键都没有计算,所以我必须使用 reducer 将 KStream(topic B) 转换为 KTable。
这是我的代码:
KTable<String, String> ktable = streamsBuilder
.stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
.filter((key, value) -> {...})
.transform(new KeyTransformer()) // generate new key
.groupByKey()
.reduce((aggValue, newValue) -> {...});
streamBuilder
.stream("TopicA")
.filter((key, value) -> {...})
.transform(...)
.leftJoin(ktable, new ValueJoiner({...}))
.transform(...)
.to("result")
1) KTable 初始化很慢。 (大约 2000 msg/s),这是正常的吗?我的主题只有 1 个分区。有什么方法可以提高性能? 我尝试设置以下内容以降低写入吞吐量,但似乎并没有太大改善。
CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000
2) 当从 Topic B 加载 KTable 时,连接发生。 这是连接发生时的偏移量(CURRENT-OFFSET/LOG-END-OFFSET)
Topic A: 32725/32726 (Lag 1)
Topic B: 1818686/23190390 (Lag 21371704)
我查看了Topic A失败记录的时间戳,是4天前的记录,Topic B最后一次处理的记录是6天前。 据我了解,kstream 进程记录基于时间戳,我不明白为什么在我的情况下,KStream(Topic A) 没有等到 KTable(Topic B) 完全加载到 4 天前触发加入。
我也尝试设置时间戳提取器返回 0,但效果不佳。
更新:将时间戳设置为 0 时,出现以下错误:
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
我还尝试将 max.task.idle.ms 设置为 > 0(3 秒和 30 分钟),但仍然遇到相同的错误。
更新:我通过将 customTimestampsExtractor 设置为 6 天前修复了“UnknownProducerIdException”错误,这仍然早于主题 A 的记录。我认为(不确定)设置为 0 会触发导致此错误的更改日志上的保留。但是,在 ktable 完成加载之前,join 仍然无法正常工作。这是为什么呢?
我正在使用 Kafka Streams 2.3.0。
我在这里做错了吗?非常感谢。
【问题讨论】:
-
已知旧时间戳的问题:issues.apache.org/jira/browse/KAFKA-6817
-
嗨 Matthias,感谢 Jira 链接。关于我的 2) 问题,我在stackoverflow.com/questions/57498201/… 中阅读了您的回答,您说的是 2.1.0,时间戳是严格同步的。但是,在我的测试中,它的行为并非如此。即使 ktable topic 中的所有记录早于 kstream 中的所有记录,join 也无需等待 ktable 完成加载即可开始。这是一个已知问题吗?
-
我不知道有任何问题——因为您已经使用
max.task.idle.ms,我不确定问题可能是什么。但请注意,您的表不是直接从主题 B 加载的:当您更改密钥时,数据会在reduce之前重新分区(类似于流输入,即在join() 之前发生重新分区) ),这也可能导致时间戳同步问题。重新分区将导致交错写入,从而导致数据乱序。 -
你有没有想过这个问题?
-
我将逻辑改为使用 GlobalKTable
标签: apache-kafka apache-kafka-streams