【发布时间】:2020-01-08 21:06:13
【问题描述】:
我需要在运行时下载 AVRO 架构,我需要传递引导服务器和 kafka 主题来解析正确的架构,但我找不到在反序列化器上传递这些参数的方法(硬编码除外)。您对如何执行此操作有任何想法吗?
val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
ops.setKafkaTopic(pars.kafkaTopic)
ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
val p = Pipeline.create(ops)
p.apply( KafkaIO.read<String, Measurement>()
.withTopic(pars.kafkaTopic)
.withBootstrapServers(pars.kafkaBootstrapServers)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(RemoteAvroDeserializer::class.java)
.withoutMetadata()
)
.apply(Values.create())
(TransformToMeasurementFN()))
.apply(
Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
.apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
.apply(Count.perElement())
.apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))
p.run()
这是我的反序列化器:
class RemoteAvroDeserializer : Deserializer<Measurement> {
val decoder: BinaryMessageDecoder<Measurement>
public constructor() {
val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
decoder = Measurement.createDecoder(schemaStore)
}
override fun deserialize(s: String, bytes: ByteArray): Measurement {
return decoder.decode(bytes)
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
override fun close() {
}
}
【问题讨论】:
-
RemoteKafkaSchemaRegistryvs Confluent 现有的 Java Registry 客户端是什么? -
嗨,这只是我们自己实现的 Confluent Java Registry Client,它根据主题和注册表获取 Avro 方案
-
这不是已经做到了吗?默认架构始终位于
/subjects/topic-value
标签: scala apache-kafka apache-beam avro