【发布时间】:2020-03-21 21:21:35
【问题描述】:
如何获取在 apache beam KafkaIO 中收到确认的记录?
基本上,我希望我没有得到任何确认的所有记录都转到一个 bigquery 表,以便我可以稍后重试。我使用了文档中的以下代码 sn-p
.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
// Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
// Rest of the settings are optional :
// you can further customize KafkaConsumer used to read the records by adding more
// settings for ConsumerConfig. e.g :
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
// set event times and watermark based on LogAppendTime. To provide a custom
// policy see withTimestampPolicyFactory(). withProcessingTime() is the default.
.withLogAppendTime()
// restrict reader to committed messages on Kafka (see method documentation).
.withReadCommitted()
// offset consumed by the pipeline can be committed back.
.commitOffsetsInFinalize()
// finally, if you don't need Kafka metadata, you can drop it.g
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create()) // PCollection<String>
【问题讨论】:
-
我想你可以看看这里:medium.com/myheritage-engineering/…你能描述一下你项目中数据流动的过程吗?什么时候开始?
-
@muscat 谢谢你的文章。但是我的问题也没有在文章中回答
标签: apache-kafka google-cloud-dataflow apache-beam apache-beam-io