【问题标题】:Flink Kafka producer: Object of class is not serializableFlink Kafka生产者:类的对象不可序列化
【发布时间】:2018-09-10 09:26:27
【问题描述】:

我尝试为我的自定义类实现一种方法,使用 Flink Kafka 连接器在 Kafka 上生成数据。类原型如下:

public class StreamData implements Serializable {
    private transient StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    ...

将数据写入特定Kafka主题的方法如下:

public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
                "localhost:9092",
                id,
                new KeyedSerializationSchema<byte[]>() {
                    @Override
                    public byte[] serializeKey(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public byte[] serializeValue(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public String getTargetTopic(byte[] bytes) {
                        return null;
                    }
                });               
        data.addSink(producer);
    }

我有另一种方法可以从 Kafka 主题获取数据到对象的 data 字段,效果很好。现在尝试从 Kafka 主题获取数据并将其写入另一个 Kafka 主题,但出现错误:

org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable

主要代码:

StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");

似乎 Java 试图序列化对象而不仅仅是字段data!使用 Flink Kafka 连接器向 Kafka 生成数据的代码已经过测试,并且可以正常使用(我的意思是不使用类并在 main 中编写所有代码)

我怎样才能消除错误?

【问题讨论】:

  • 我认为问题在于当它试图创建您的实际执行环境时,它需要能够将所有内容序列化到作业管理器中,但是您将 DataStream 嵌套在您的对象中,所以这是'不会翻译得很好。您能否尽可能多地编辑您的示例,并按照您认为合适的方式进行清理?很难确切地看到您要做什么,但我认为这与 Kafka 的问题无关
  • @JoshuaDeWald 正如我所说,这是因为 Java 或 Flink 的特性,因为我在 main 函数中测试了没有类的相同代码并且它可以工作。其他方法,比如从 Kafka 读取数据或者在 Cassandra 上写入数据效果很好,只是向 Kafka 写入数据的方法不起作用!

标签: serialization apache-kafka apache-flink


【解决方案1】:

我认为问题的原因是您的代码正在这样做:

new KeyedSerializationSchema<byte[]>() {...}

这段代码的作用是创建一个 KeyedSerializationSchema 的匿名子类作为定义类 (StreamData) 的内部类。每个内部类都包含对外部类实例的隐式引用,因此使用默认的 Java 序列化规则对其进行序列化也将传递地尝试序列化外部对象 (StreamData)。解决此问题的最佳方法是将 KeyedSerializationSchema 的子类声明为:

我认为最后一种方法如下所示:

public class StreamData {
    static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
        ...
    };
    ...
    public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);               
        data.addSink(producer);
    }
}

【讨论】:

  • 您对我的解决方案有何想法:将字段 data 更改为 static。请参阅下面的帖子。
  • 好吧,我认为它不必要地序列化 StreamData 对象,并且如果您将来添加任何非静态非瞬态字段,它也会尝试序列化。
【解决方案2】:

你也可以像这样在 Flink 中进行序列化

dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new 
                                             CustomSerializerSchema(),properties));

  public class CustomSerializerSchema implements SerializationSchema<MyUser> {

    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(MyUser element) {
        byte[] b = null;
         try {
             b= new ObjectMapper().writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return b; 
    }
}

【讨论】:

    【解决方案3】:

    data 属性设为静态,解决了问题。谁能详细说明一下,这是一个好的解决方案吗?

    private static DataStream<byte[]> data ;
    

    【讨论】:

      猜你喜欢
      • 2019-10-07
      • 2020-04-29
      • 2018-12-08
      • 1970-01-01
      • 2020-08-17
      • 2018-01-31
      • 2018-01-17
      • 2018-10-17
      • 1970-01-01
      相关资源
      最近更新 更多