【发布时间】:2018-09-10 09:26:27
【问题描述】:
我尝试为我的自定义类实现一种方法,使用 Flink Kafka 连接器在 Kafka 上生成数据。类原型如下:
public class StreamData implements Serializable {
private transient StreamExecutionEnvironment env;
private DataStream<byte[]> data ;
...
将数据写入特定Kafka主题的方法如下:
public void writeDataIntoESB(String id) throws Exception {
FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
"localhost:9092",
id,
new KeyedSerializationSchema<byte[]>() {
@Override
public byte[] serializeKey(byte[] bytes) {
return bytes;
}
@Override
public byte[] serializeValue(byte[] bytes) {
return bytes;
}
@Override
public String getTargetTopic(byte[] bytes) {
return null;
}
});
data.addSink(producer);
}
我有另一种方法可以从 Kafka 主题获取数据到对象的 data 字段,效果很好。现在尝试从 Kafka 主题获取数据并将其写入另一个 Kafka 主题,但出现错误:
org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable
主要代码:
StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");
似乎 Java 试图序列化对象而不仅仅是字段data!使用 Flink Kafka 连接器向 Kafka 生成数据的代码已经过测试,并且可以正常使用(我的意思是不使用类并在 main 中编写所有代码)
我怎样才能消除错误?
【问题讨论】:
-
我认为问题在于当它试图创建您的实际执行环境时,它需要能够将所有内容序列化到作业管理器中,但是您将 DataStream 嵌套在您的对象中,所以这是'不会翻译得很好。您能否尽可能多地编辑您的示例,并按照您认为合适的方式进行清理?很难确切地看到您要做什么,但我认为这与 Kafka 的问题无关
-
@JoshuaDeWald 正如我所说,这是因为 Java 或 Flink 的特性,因为我在
main函数中测试了没有类的相同代码并且它可以工作。其他方法,比如从 Kafka 读取数据或者在 Cassandra 上写入数据效果很好,只是向 Kafka 写入数据的方法不起作用!
标签: serialization apache-kafka apache-flink