【问题标题】:Serialization of java.util.map and custom pojo in FlinkFlink 中 java.util.map 和自定义 pojo 的序列化
【发布时间】:2021-06-29 19:47:09
【问题描述】:

我正在尝试使用不从 kinesis flink 应用程序中的外部库实现 Serializable 的 pojo。在 flatMap 函数中使用它时序列化失败。

波乔

public class ExecutionRecord {
    private Map<String, VariableGroup> factMap;
    private List<ModelResult> models;
    private List<RulesetResult> rulesets;
    private Outcome outcome;
    private ExecutionMetadata executionMetadata;
}

TypeInformation.of(ExecutionRecord.class).toString()的输出

PojoType<ExecutionRecord, fields = [executionMetadata: PojoType<ExecutionMetadata, fields = [endTime: String, evaluationType: String, executionHost: String, executionId: String, gmraInstanceIDs: GenericType<java.util.List>, startTime: String]>, factMap: GenericType<java.util.Map>, models: GenericType<java.util.List>, outcome: PojoType<Outcome, fields = [actions: GenericType<java.util.List>, failedActions: GenericType<java.util.List>, outcomeName: String]>, rulesets: GenericType<java.util.List>]>

错误- java.io.NotSerializableException: ExecutionRecord

堆栈跟踪也没有显示它无法序列化的特定字段。

我应该如何为java.util.listjava.util.map 注册序列化程序,它们被识别为泛型类型以及其他自定义pojos

【问题讨论】:

  • 您使用的是默认序列化程序(Kyro)吗? ExecutionRecord 是否满足所有requirements? (例如,我看不到私有字段的吸气剂)
  • @PeterCsala 我尝试了两种方式 - 强制启用 Kyro 以及让 flink 决定。是的,那些越来越满意了。 pojo 具有所有字段的公共 getter、setter 以及默认构造函数。我认为这就是为什么它从 TypeInformation 输出中被识别为 pojoType。

标签: java serialization apache-flink flink-streaming amazon-kinesis-analytics


【解决方案1】:

你可以这样做

public static final TypeInformation<ExecutionRecord> TYPE_INFORMATION_POJO = Types.POJO(ExecutionRecord.class);

or

public static final TypeInformation<ExecutionRecord> TYPE_INFORMATION = TypeInformation.of(BehProdViewFLDTO.class);

并将TYPE_INFORMATION_POJOTYPE_INFORMATION 传递给各州或在您可能需要时!

【讨论】:

    猜你喜欢
    • 2019-06-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-10
    • 2021-04-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多