【问题标题】:Apache Flink: Read data from Kafka as byte arrayApache Flink:从 Kafka 读取数据作为字节数组
【发布时间】:2018-05-09 17:24:03
【问题描述】:

如何从 Kafka 读取 byte[] 格式的数据?

我有一个将事件读取为StringSimpleStringSchema() 的实现,但我找不到将数据读取为byte[] 的架构。

这是我的代码:

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kafka1:9092");
    properties.setProperty("zookeeper.connect", "zookeeper1:2181");
    properties.setProperty("group.id", "test");
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    properties.setProperty("auto.offset.reset", "earliest");
    DataStream<byte[]> stream = env
                .addSource(new FlinkKafkaConsumer010<byte[]>("testStr", ? ,properties));

【问题讨论】:

    标签: deserialization apache-flink kafka-consumer-api flink-streaming


    【解决方案1】:

    对于scala,你应该写如下

    new AbstractDeserializationSchema[Array[Byte]](){
         override def deserialize(bytes: Array[Byte]): Array[Byte] = bytes
    }
    

    【讨论】:

      【解决方案2】:

      最后我发现:

      DataStream<byte[]> stream = env
                  .addSource(new FlinkKafkaConsumer010<>("testStr", new AbstractDeserializationSchema<byte[]>() {
                      @Override
                      public byte[] deserialize(byte[] bytes) throws IOException {
                          return bytes;
                      }
                  }, properties));
      

      【讨论】:

      • 成功了,你可以创建一个 PR 到 Flink Github 存储库。
      • 这不是问题,但它应该是一个实用反序列化类。我敢肯定,无论何时使用字节数组源,都会有很多人需要它。
      猜你喜欢
      • 1970-01-01
      • 2016-11-08
      • 2018-02-01
      • 2017-05-06
      • 2018-05-13
      • 1970-01-01
      • 2021-01-05
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多