【问题标题】:apache flink with Kafka: InvalidTypesExceptionapache flink 与 Kafka:InvalidTypesException
【发布时间】:2020-05-04 18:07:15
【问题描述】:

我有以下代码:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
                    "test-kafka-topic",
                    new SimpleStringSchema(),
                    properties);

final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer);

DataStream<String> stringStream = kafkaInputStream
                    .map(new MapFunction<MyCustomClass,String>() {
                        @Override
                        public String map(MyCustomClass message) {
                            logger.info("--- Received message : " + message.toString());
                            return message.toString();
                        }
                    });

streamEnv.execute("Published messages");

MyCustomClassDeserializer 用于将字节数组转换为 java 对象。

当我在本地运行这个程序时,我得到了错误:

原因:org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected.

我得到这个代码行:

.map(new MapFunction<MyCustomClass,String>() {

不知道为什么我会得到这个?

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    所以,你有一个返回 POJO 的反序列化器,但你告诉 Flink 它应该使用 SimpleStringSchema 将记录从 byte[] 反序列化到 String。 现在看到问题了吗? :)

    我认为您通常不应该使用FlinkKafkaConsumer 中的自定义 Kafka 反序列化器。相反,您应该瞄准的是创建一个从 Flink 扩展 DeserializationSchema 的自定义类。它在类型安全性和可测试性方面应该要好得多。

    【讨论】:

    • 谢谢,知道了。实际上它是一个遗留代码,有太多这样的问题。
    猜你喜欢
    • 1970-01-01
    • 2018-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-18
    • 2020-12-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多