【发布时间】:2020-08-17 01:51:33
【问题描述】:
我正在尝试使用 flink 从 kafka 主题中读取 json 消息。
我正在使用 Kafka 2.4.1 和 Flink 1.10
我为我的消费者设置了:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT,
new JSONKeyValueDeserializationSchema(false), properties);
当我使用 SimpleStringSchema 时,我将 json 作为文本得到,这很好,但使用 JSONKeyValueDeserializer 我得到:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
sensor_5 将是主题中的一个关键我猜我需要添加其他内容来从馈送到序列化程序的 kafka 消息值中获取 JSON 并以某种方式处理该密钥,但我不确定?
有什么建议吗?
json结构为:
{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
它是通过
提交的# Python 3
import json
from confluent_kafka import Producer
dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})
producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))
【问题讨论】:
-
值在进入Kafka之前是如何序列化的?
-
python3
confluent_kafka生产者和json.dumps(data_dict)。 dict 只是一个普通的 python 字典,混合了带有不同键的字符串和浮点值 -
能否请您也发布 JSON。那么,我可以在我的 PC 中重现该问题吗?
-
添加了完整的代码来生成消息
标签: java json apache-kafka apache-flink