【问题标题】:serialize object for kafka in flink在 flink 中为 kafka 序列化对象
【发布时间】:2019-10-07 21:12:06
【问题描述】:

我正在尝试使用 flink 从 kafka 读取数据执行某些功能并将结果返回到不同的 Kafka 主题,但出现以下错误。 `org.apache.flink.api.common.InvalidProgramException: MapFunction 的实现不可序列化。该对象可能包含或引用不可序列化的字段。

` 我从 kafka 收到消息 - 对其进行一些操作并返回我想发送到不同主题的对象列表。

class Wrapper implements Serializable{
        @JsonProperty("viewBuilderRequests")
        private ArrayList<ViewBuilderRequest> viewBuilderRequests;

        public Wrapper(){}

        public Wrapper(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }

        public List<ViewBuilderRequest> getViewBuilderRequests() {
            return viewBuilderRequests;
        }

        public void setViewBuilderRequests(ArrayList<ViewBuilderRequest> viewBuilderRequests) {
            this.viewBuilderRequests = viewBuilderRequests;
        }
    }



public class ViewBuilderRequest implements Serializable {
    private CdmId cdmId
    private ViewBuilderOperation operation
    private List<ViewUserSystemIdentifier> viewUserSystemIdentifiers
    public ViewBuilderRequest(){
}

    public CdmId getCdmId() {
        return cdmId;
    }

    public void setCdmId(CdmId cdmId) {
        this.cdmId = cdmId;
    }

    public ViewBuilderOperation getOperation() {
        return operation;
    }

    public void setOperation(ViewBuilderOperation operation) {
        this.operation = operation;
    }

    public List<ViewUserSystemIdentifier> getViewUserSystemIdentifiers() {
        return viewUserSystemIdentifiers;
    }

    public void setViewUserSystemIdentifiers(List<ViewUserSystemIdentifier> viewUserSystemIdentifiers) {
        this.viewUserSystemIdentifiers = viewUserSystemIdentifiers;
    }

    public enum ViewBuilderOperation implements Serializable{
        Create, Update,Delete
    }




private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
    UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
    Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
    return wrapper;
};

内部类也实现了 Serializable

此代码引发异常:

dataStream.map(parseAndSendToGraphProcessing) .addSink(new FlinkKafkaProducer<Wrapper>(kafkaConfiguration.getBootstrapServers(),"graphNotifications",new WrapperSchema()));

我还对这两个对象进行了反序列化。

public class WrapperSchema implements DeserializationSchema<Wrapper>, SerializationSchema<Wrapper> {
//        private final static ObjectMapper objectMapper = new ObjectMapper().configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);

    static ObjectMapper objectMapper = new ObjectMapper();

    @Override
        public Wrapper deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, Wrapper.class);
        }

        @Override
        public boolean isEndOfStream(Wrapper nextElement) {
            return false;
        }

        @Override
        public byte[] serialize(Wrapper element) {
//            return element.toString().getBytes();
            if(objectMapper == null) {
                objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
                objectMapper = new ObjectMapper();
            }
            try {
                String json = objectMapper.writeValueAsString(element);
                return json.getBytes();
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }

            return new byte[0];
        }

        @Override
        public TypeInformation<Wrapper> getProducedType() {
            return TypeInformation.of(Wrapper.class);
        }
}

【问题讨论】:

    标签: serialization apache-kafka apache-flink


    【解决方案1】:

    为了让 flink 工作,你的消息和地图函数必须是可序列化的。

    据我所知,您的消息似乎是可序列化的。

    但您的地图功能不是。有时很难让 lambda 可序列化。我认为在您的情况下,问题在于parseAndSendToGraphProcessing 正在使用objectMapperjanusGraphDataProcessing,它们必须是可序列化的。

    我的猜测是 janusGraphDataProcessing 不可序列化(如果您使用的是 jackson 2.1 或更高版本,则为 OjbectMapper)。

    如果是这种情况,那么一种解决方法是编写一个自定义 RichMapFunction 类,它将janusGraphDataProcessing 存储为一个瞬态变量,并在它的open 函数中对其进行初始化。

    private MapFunction<String, Wrapper> parseAndSendToGraphProcessing = s ->{
        UserMatchingRequest userMatchingRequest = objectMapper.readValue(s, UserMatchingRequest.class);
        Wrapper wrapper = new Wrapper(janusGraphDataProcessing.handleMessage(userMatchingRequest));
        return wrapper;
    };
    

    【讨论】:

      猜你喜欢
      • 2018-09-10
      • 2020-08-17
      • 2020-04-29
      • 1970-01-01
      • 2021-04-15
      • 1970-01-01
      • 1970-01-01
      • 2012-07-14
      • 2018-07-11
      相关资源
      最近更新 更多