【问题标题】:Extra bytes with KafkaAvroSerializerKafkaAvroSerializer 的额外字节
【发布时间】:2019-07-05 15:06:55
【问题描述】:

我的设置如下:我从 ftp 服务器检索 xml 文件,将它们解组到 POJO,将其映射到 Avro 生成的类,然后将其转发到 Alpakkas's Producer Sink,如下所示:

Ftp.ls("/", ftpSettings)
  .filter(FtpFile::isFile)
  .mapAsyncUnordered(10,
    ftpFile -> {
      CompletionStage<ByteString> fetchFile =
        Ftp.fromPath(ftpFile.path(), ftpSettings).runWith(Sink.reduce((a, b) -> a), materializer);
      return fetchFile;
    })
  .map(b -> b.decodeString(Charsets.ISO_8859_1))
  .map(StringReader::new)
  .map(AlpakkaProducerDemo::unmarshalFile)
  .map(AlpakkaProducerDemo::convertToAvroSerializable)
  .map(a -> new ProducerRecord<>(kafkaTopic, a.id().toString(), a))
  .map(record -> ProducerMessage.single(record))
  .runWith(Producer.committableSink(producerSettings, kafkaProducer), materializer);

问题是序列化显然不能正常工作。例如。我也希望将密钥进行 avro 序列化,尽管它只是一个字符串(要求,不要问)。配置如下:

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final KafkaAvroSerializer keyAvroSerializer = new KafkaAvroSerializer();
keyAvroSerializer.configure(kafkaAvroSerDeConfig, true);
final Serializer<Object> keySerializer = keyAvroSerializer;
final Config config = system.settings().config().getConfig("akka.kafka.producer");
final ProducerSettings producerSettings = ProducerSettings.create(config, keySerializer, valueSerializer)
  .withBootstrapServers(kafkaServer);

在 Kafka 中,这会产生一个内容正确的键,但字符串开头有一些(明显的)额外字节:\u0000\u0000\u0000\u0000\u0001N。可以想象,这会对价值造成严重破坏。我怀疑 Avro 序列化不适合 Alpakka 使用的信封 API,因此可能需要事先序列化为 byte[] 并使用常见的 ByteSerializer。但是,那时使用SchemaRegistry 没有任何意义。

【问题讨论】:

    标签: serialization apache-kafka avro confluent-schema-registry alpakka


    【解决方案1】:

    前五个字节与序列化格式版本(字节 0)和架构注册表中的 Avro 架构版本(字节 1-4)有关:https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format

    另一种选择可能是使用 Kafka Connect,以及 FTP 源和 XML 转换。

    【讨论】:

    • 感谢您的快速澄清!我们考虑 Connect,但也想评估 Alpakka。字节出现在键/值中的任何明显原因?
    • 它们会出现,因为那是序列化线格式。如果你不想要它们……不要使用序列化器 :) 这是 Kafka Connect 是一个不错选择的另一个原因——它只适用于输入/输出序列化之类的东西。
    • 不知道为什么或如何,因为我没有改变任何东西,但是今天魔术字节没有出现并且序列化正常工作......
    猜你喜欢
    • 1970-01-01
    • 2010-11-13
    • 2013-11-24
    • 1970-01-01
    • 1970-01-01
    • 2015-05-15
    • 2018-07-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多