【问题标题】:Acknowledgement Kafka Producer Apache Beam致谢 Kafka Producer Apache Beam
【发布时间】:2020-03-21 21:21:35
【问题描述】:

如何获取在 apache beam KafkaIO 中收到确认的记录?

基本上,我希望我没有得到任何确认的所有记录都转到一个 bigquery 表,以便我可以稍后重试。我使用了文档中的以下代码 sn-p

    .apply(KafkaIO.<Long, String>read()
       .withBootstrapServers("broker_1:9092,broker_2:9092")
       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
       .withKeyDeserializer(LongDeserializer.class)
       .withValueDeserializer(StringDeserializer.class)

       // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

       // Rest of the settings are optional :

       // you can further customize KafkaConsumer used to read the records by adding more
       // settings for ConsumerConfig. e.g :
       .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))

       // set event times and watermark based on LogAppendTime. To provide a custom
       // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
       .withLogAppendTime()

       // restrict reader to committed messages on Kafka (see method documentation).
       .withReadCommitted()

       // offset consumed by the pipeline can be committed back.
       .commitOffsetsInFinalize()

       // finally, if you don't need Kafka metadata, you can drop it.g
       .withoutMetadata() // PCollection<KV<Long, String>>
    )
    .apply(Values.<String>create()) // PCollection<String>

【问题讨论】:

  • 我想你可以看看这里:medium.com/myheritage-engineering/…你能描述一下你项目中数据流动的过程吗?什么时候开始?
  • @muscat 谢谢你的文章。但是我的问题也没有在文章中回答

标签: apache-kafka google-cloud-dataflow apache-beam apache-beam-io


【解决方案1】:

默认情况下,Beam IO 旨在不断尝试写入/读取/处理元素,直到 . (重复错误后批处理管道会失败)

您所指的通常称为Dead Letter Queue,用于获取失败的记录并将它们添加到 PCollection、Pubsub 主题、排队服务等。这通常是可取的,因为它允许流式管道生成进度(不是阻塞),当遇到写入某些记录的错误时,允许写入成功的一次。

不幸的是,除非我弄错了,否则 Kafka IO 中没有实现死信队列。可以修改 KafkaIO 来支持这一点。在 Beam 邮件列表上进行了一些讨论,并提出了一些实现这一点的想法,might have some ideas

我怀疑可以将它添加到KafkaWriter,捕获失败的记录并将它们输出到另一个 PCollection。如果您选择实现此功能,也请联系梁community mailing list,如果您需要帮助将其合并到 master 中,他们将能够帮助确保更改涵盖必要的要求,以便可以合并并作为有意义的整为梁。

然后您的管道可以将它们写入其他地方(即不同的来源)。当然,如果该辅助源同时出现中断/问题,您将需要另一个 DLQ。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-26
    • 1970-01-01
    • 2018-07-22
    • 1970-01-01
    相关资源
    最近更新 更多