【问题标题】:Kafka Connect Deserializing byte arrayKafka Connect 反序列化字节数组
【发布时间】:2018-09-08 07:52:49
【问题描述】:

我正在尝试借助 Kafka 连接接收字节数组序列化的 Avro 消息。 用于序列化 avro 数据的生产者配置

key.serializer-org.apache.kafka.common.serialization.ByteArraySerializer
value.serializer-org.apache.kafka.common.serialization.ByteArraySerializer

hdfs 接收器配置

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=csvtopic
hdfs.url=hdfs://10.15.167.119:8020
flush.size=3
locale=en-us
timezone=UTC
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schema.registry.url=http://localhost:8081
hive.metastore.uris=thrift://10.15.167.119:9083
hive.integration=true
schema.compatibility=BACKWARD

如果我从 hdfs quickstart-hdfs.properties 中删除 hive 集成和 format.class,我可以将数据保存到 HDFS。 启用 hive 集成后,我会收到以下异常堆栈跟踪

java.lang.RuntimeException: org.apache.kafka.connect.errors.SchemaProjectorException: Schema version required for BACKWARD compatibility
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:401)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

如何反序列化从Kafka主题接收到的字节流并将其保存在hive中??

【问题讨论】:

    标签: apache-kafka avro apache-kafka-connect


    【解决方案1】:

    如果您将 Avro 与架构注册表一起用于您的消息,您应该使用 AvroConverter 而不是 ByteArrayConverter,即:

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    

    【讨论】:

    • 感谢您的回复。当我使用 avroconverter 时,出现以下异常 Error deserializing Avro message for id -1 Unknown magic byte!”。
    • 数据是如何产生的?如果它是使用 Confluent Schema Registry 序列化器(例如使用 Kafka Connect 源)序列化的 Avro,那么它可以正常工作。
    • 数据未使用 Confluent 模式注册表序列化技术进行序列化。它已经用 org.apache.kafka.common.serialization.ByteArraySerializer 进行了序列化。 Kafka连接数据的序列化方式有什么限制吗??
    • 如果你想使用模式注册表(例如你指定 schema.compatibility)那么你需要使用 Confluent avro 序列化器,是的。
    • 抱歉偏离了话题。在flume hdfs sink中,我们可以使用拦截器将模式附加到传入数据并使用风筝数据集,可以在hive中管理数据(例如:-github.com/kite-sdk/kite-examples/blob/master/json/README.md)。这种功能可以通过kafka connect来实现吗??我无法控制传入数据以使用融合模式注册表生成数据。我需要找到一种方法将传入的字节数组序列化数据保存到 hive 和 hdfs 中。
    【解决方案2】:

    我浏览了您的 cmets 和您的代码。 您正在使用 ByteArrayOutputStream 进行编码,并且 kafka-connect 无法理解此类数据。而是使用以下方式发送数据。

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://localhost:8081");
    KafkaProducer producer = new KafkaProducer(props);
    

    发送数据时使用这个,

     GenericData.Record record = new GenericData.Record(User.getClassSchema());
     record.put("favorite_color", user.getFavoriteColor());
     record.put("favorite_number", user.getFavoriteNumber());
     record.put("name", user.getName());
     ProducerRecord<Object, Object> precord = new ProducerRecord<>("topic1",record);
     producer.send(precord);
    

    在您的 kafka 连接配置中使用:

    key.converter=io.confluent.connect.avro.AvroConverter
    value.converter=io.confluent.connect.avro.AvroConverter
    

    【讨论】:

    • 使用 Confluent Avro 转换器我能够传输数据并保存在 hive 和 hdfs 中。我正在寻找一种使用模式注册表中可用的模式来反序列化字节数组的方法。
    • 如果你使用 io.confluent.connect.avro.AvroConverter 在你的 kafka 连接中反序列化,那么你不能使用字节数组发送。这是因为 AvroConverter 不像普通的 avro 反序列化器,而是查找版本数据(这是在序列化时使用 AvroConverter 时放置的元数据)。它将此版本添加到模式注册表中以使用此版本进行查找以获取整个模式。像这样的序列化也可以通过网络节省大量数据,因为不需要发送带有每条记录的模式,只需要发送版本。
    猜你喜欢
    • 2021-07-01
    • 2011-09-29
    • 1970-01-01
    • 2018-07-26
    • 1970-01-01
    • 2012-03-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多