【问题标题】:Flink handling Kafka messages with parsing errorFlink 处理带有解析错误的 Kafka 消息
【发布时间】:2021-11-12 03:16:09
【问题描述】:

我有一些 InputIoTMessage 类型的 Kafka 消息来自 Kafka,并通过 FlinkKafkaConsumer 消费,如下所示。如果有 NoSuchFieldException,我想在 InputIoTMessage 类中添加一个错误字段。此外,这是处理此类场景的最佳实践,还是我们在 Java 8 中有更优雅的东西,例如使用 Option 还是 Future?

    String inputTopic = "sensors";
    String outputTopic = "sensors_out";
    String consumerGroup = "baeldung";
    String address = "kafka:9092"; 

    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

    FlinkKafkaConsumer011<InputIoTMessage> flinkKafkaConsumer = createIoTConsumerForTopic(inputTopic, address, consumerGroup);
    flinkKafkaConsumer.setStartFromEarliest();
    

    DataStream<InputIoTMessage> stringInputStream = environment.addSource(flinkKafkaConsumer);
    System.out.println("IoT Message received :: " );
    
    stringInputStream
    .filter((event) -> {
        if(event.has("jsonParseError")) {
            LOG.warn("JsonParseException was handled: " + event.get("jsonParseError").asText());
            return false;
        }
        return true;
    })
    .print();

InputIoTMessage.java(有方法检查字段是否存在)

public boolean has(String fieldName) {
    boolean isExists;
    try {
        isExists = fieldName.equalsIgnoreCase(this.getClass().getField(fieldName).getName());
    } catch (NoSuchFieldException | SecurityException e) {
        Field[] fieldArr = this.getClass().getDeclaredFields();
                
        //Question: how to add "jsonParseError" field to the object here ?
    }
    return true;
}

【问题讨论】:

    标签: apache-kafka apache-flink kafka-consumer-api flink-streaming


    【解决方案1】:

    filter函数不会修改输入的记录,或许你可以实现flatMap函数,修改记录后通过out.collect输出

        stringInputStream.flatMap(new FlatMapFunction<InputIoTMessage, InputIoTMessage>() {
            @Override
            public void flatMap(InputIoTMessage input, Collector<InputIoTMessage> out) {
                if (!input.has("jsonParseError")) {
                    InputIoTMessage output = xxxxx;
                    out.collect(output);
                }
            }
        });
    

    【讨论】:

      猜你喜欢
      • 2018-11-29
      • 2021-08-11
      • 1970-01-01
      • 2017-07-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-21
      • 1970-01-01
      相关资源
      最近更新 更多