【问题标题】:How to pass parameters to an avro deserializer in apache beam (KafkaIO)?如何将参数传递给 Apache Beam(KafkaIO)中的 avro 反序列化器?
【发布时间】:2020-01-08 21:06:13
【问题描述】:

我需要在运行时下载 AVRO 架构,我需要传递引导服务器和 kafka 主题来解析正确的架构,但我找不到在反序列化器上传递这些参数的方法(硬编码除外)。您对如何执行此操作有任何想法吗?

    val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
    ops.setKafkaTopic(pars.kafkaTopic)
    ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
    ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
    val p = Pipeline.create(ops)

    p.apply( KafkaIO.read<String, Measurement>()
            .withTopic(pars.kafkaTopic)
            .withBootstrapServers(pars.kafkaBootstrapServers)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializer(RemoteAvroDeserializer::class.java)
            .withoutMetadata()
    )
            .apply(Values.create())
            (TransformToMeasurementFN()))
            .apply(
                    Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
            .apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
            .apply(Count.perElement())
            .apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))

    p.run()

这是我的反序列化器:

class RemoteAvroDeserializer : Deserializer<Measurement> {
 val decoder: BinaryMessageDecoder<Measurement>

 public constructor() {
     val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
     decoder = Measurement.createDecoder(schemaStore)
 }

 override fun deserialize(s: String, bytes: ByteArray): Measurement {
     return decoder.decode(bytes)
 }

 override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
 }

 override fun close() {
 }
}

【问题讨论】:

  • RemoteKafkaSchemaRegistry vs Confluent 现有的 Java Registry 客户端是什么?
  • 嗨,这只是我们自己实现的 Confluent Java Registry Client,它根据主题和注册表获取 Avro 方案
  • 这不是已经做到了吗?默认架构始终位于 /subjects/topic-value

标签: scala apache-kafka apache-beam avro


【解决方案1】:

根据 Beam 文档,您可以像这样设置消费者配置

  KafkaIO... 
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))

我假设您可以在这里添加schema.registry.url 或其他任何内容

【讨论】:

  • 不幸的是,withValueDeserializer 只接受 Class>
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-07-12
  • 1970-01-01
  • 1970-01-01
  • 2023-03-11
  • 1970-01-01
  • 2016-10-28
  • 2016-03-24
相关资源
最近更新 更多