【问题标题】:How to read binary data on Kafka topics in Spark如何在 Spark 中读取有关 Kafka 主题的二进制数据
【发布时间】:2018-10-06 14:32:04
【问题描述】:

我需要从 Kafka 主题中读取加密消息。我当前从主题中读取字符串的代码如下所示:

JavaPairReceiverInputDStream<String, String> pairrdd = 
            KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);

我应该怎么做才能从 kafka 队列中更改此代码,以确保读取的字节数组,加密数据没有损坏

【问题讨论】:

  • 嗯...&lt;byte[], byte[]&gt; 会是一个好的开始

标签: java apache-spark apache-kafka spark-streaming


【解决方案1】:

要以&lt;byte[], byte[]&gt;的形式从Kafka读取数据,可以使用KafkaUtils。像这样-

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<byte[], byte[]>> pairrdd =
  KafkaUtils.createDirectStream(
    jssc,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams)
  );

我希望这会有所帮助!

【讨论】:

  • @kevin 有什么问题?
  • 嗯,我kafka中的数据是二进制消息,我想在sparkstreaming中读取,然后以字节为单位进行解析。
  • @kevin,在这种情况下,您可以使用ByteArrayDeserializer 而不是StringDeserializer。我已经相应地更新了我的答案。
猜你喜欢
  • 1970-01-01
  • 2023-03-19
  • 2020-09-18
  • 1970-01-01
  • 2016-05-27
  • 1970-01-01
  • 2019-06-24
  • 2021-06-11
  • 1970-01-01
相关资源
最近更新 更多