【问题标题】:JSON Kafka spout in Apache StormApache Storm 中的 JSON Kafka spout
【发布时间】:2020-05-01 15:03:51
【问题描述】:

我正在使用 Kafka spout 构建 Storm 拓扑。我正在以 JSON 格式从 Kafka(没有 Zookeeper)消费,Storm 应该输出它。
如何为 JSON 数据类型定义正确的架构? 目前,我有这样的代码库和基本的 spout 实现:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

我是 Apache Storm 的新手,因此很乐意提供任何建议。

【问题讨论】:

  • “Kafka(没有 Zookeeper)”是什么意思?你仍然需要 Zookeeper,而且 broker 不应该充当生产者应用程序,无论如何
  • 另外,storm 是必需的吗? Kafka Streams 应该能够做同样的事情
  • 我已经将 Kafka 设置为没有 Zookeper 的 docker 容器用于本地开发。 Storm 是必需的
  • Kafka 需要 Zookeeper ... Storm 只会将其作为字符串传递
  • 是的,我使用了 Spotify 镜像,但 Kafka 生产者和 Spark/Flink 消费者一样工作正常,但 Storm 不行

标签: json apache-kafka apache-storm spout


【解决方案1】:

你可以做几件事:

您可以定义RecordTranslator。此接口允许您根据从 Kafka 读取的 ConsumerRecord 定义 spout 将如何构造元组。

默认实现如下所示:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

如您所见,您将获得一个ConsumerRecord,这是一个内置在底层Kafka客户端库中的类型,然后必须将其转换为一个List&lt;Object&gt;,这将是元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您的做法。例如,如果您想将键、值和偏移量填充到随后发出的数据结构中,您可以在此处执行此操作。你使用像KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()这样的翻译器

如果您只想将键/值反序列化为您自己的数据类之一,更好的选择是实现Deserializer。这将让您定义如何反序列化从 Kafka 获得的键/值。如果你想反序列化例如你自己的数据类的值,你可以使用这个接口来完成。

默认的StringDeserializer 这样做:

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

一旦您创建了自己的Deserializer,您就可以通过执行KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build() 之类的操作来使用它。有一个类似的消费者属性用于设置值反序列化器。

【讨论】:

    猜你喜欢
    • 2018-09-19
    • 2019-03-14
    • 2016-11-12
    • 2019-05-04
    • 1970-01-01
    • 1970-01-01
    • 2015-03-13
    • 2017-08-24
    • 2013-06-24
    相关资源
    最近更新 更多