【问题标题】:Flink deserialize Kafka JSONFlink 反序列化 Kafka JSON
【发布时间】:2020-08-17 01:51:33
【问题描述】:

我正在尝试使用 flink 从 kafka 主题中读取 json 消息。

我正在使用 Kafka 2.4.1 和 Flink 1.10

我为我的消费者设置了:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;


FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT, 
                new JSONKeyValueDeserializationSchema(false), properties);

当我使用 SimpleStringSchema 时,我将 json 作为文本得到,这很好,但使用 JSONKeyValueDeserializer 我得到:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

sensor_5 将是主题中的一个关键我猜我需要添加其他内容来从馈送到序列化程序的 kafka 消息值中获取 JSON 并以某种方式处理该密钥,但我不确定?

有什么建议吗?

json结构为:

{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}

它是通过

提交的
# Python 3
import json
from confluent_kafka import Producer

dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})

producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))

【问题讨论】:

  • 值在进入Kafka之前是如何序列化的?
  • python3 confluent_kafka 生产者和json.dumps(data_dict)。 dict 只是一个普通的 python 字典,混合了带有不同键的字符串和浮点值
  • 能否请您也发布 JSON。那么,我可以在我的 PC 中重现该问题吗?
  • 添加了完整的代码来生成消息

标签: java json apache-kafka apache-flink


【解决方案1】:

所以,基本上,如果你看一下JSONKeyValueDeserializationSchema 的源代码,你可以看到它如下所示:

    if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                .put("offset", record.offset())
                .put("topic", record.topic())
                .put("partition", record.partition());
        }
        return node;

因此,通常架构期望您的密钥是 JSON 而不是字符串,因此 sensor_5 将失败。我认为最好和最简单的解决方案是创建您自己的以字符串为键的实现。

【讨论】:

  • 那么这是否意味着我必须使用org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema 创建我自己的类,然后在deserialize 方法中使用JSONKeyValueDeserializationSchema 作为反序列化模式,以便处理Kafka 密钥并json消息也被处理了?
【解决方案2】:

如果您不想在记录中包含您的密钥,您可以实现DeserializationSchema 而不是KeyedDeserializationSchema

一个例子如下:

public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {

    private static final long serialVersionUID = -1L;

    private ObjectMapper mapper;

    @Override
    public ObjectNode deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (message != null) {
            node.set("value", mapper.readValue(message, JsonNode.class));
        }
        return node;
    }

    @Override
    public boolean isEndOfStream(ObjectNode nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ObjectNode> getProducedType() {
        return getForClass(ObjectNode.class);
    }
}

如果您想在记录中也包含密钥,您可以按照 Dominik Wosiński 的回答中提到的那样实现KeyedDeserializationSchema

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-10-21
    • 2020-09-20
    • 2021-10-20
    • 1970-01-01
    • 2023-03-23
    • 1970-01-01
    • 2019-10-07
    • 1970-01-01
    相关资源
    最近更新 更多