【发布时间】:2021-11-12 03:16:09
【问题描述】:
我有一些 InputIoTMessage 类型的 Kafka 消息来自 Kafka,并通过 FlinkKafkaConsumer 消费,如下所示。如果有 NoSuchFieldException,我想在 InputIoTMessage 类中添加一个错误字段。此外,这是处理此类场景的最佳实践,还是我们在 Java 8 中有更优雅的东西,例如使用 Option 还是 Future?
String inputTopic = "sensors";
String outputTopic = "sensors_out";
String consumerGroup = "baeldung";
String address = "kafka:9092";
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<InputIoTMessage> flinkKafkaConsumer = createIoTConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<InputIoTMessage> stringInputStream = environment.addSource(flinkKafkaConsumer);
System.out.println("IoT Message received :: " );
stringInputStream
.filter((event) -> {
if(event.has("jsonParseError")) {
LOG.warn("JsonParseException was handled: " + event.get("jsonParseError").asText());
return false;
}
return true;
})
.print();
InputIoTMessage.java(有方法检查字段是否存在)
public boolean has(String fieldName) {
boolean isExists;
try {
isExists = fieldName.equalsIgnoreCase(this.getClass().getField(fieldName).getName());
} catch (NoSuchFieldException | SecurityException e) {
Field[] fieldArr = this.getClass().getDeclaredFields();
//Question: how to add "jsonParseError" field to the object here ?
}
return true;
}
【问题讨论】:
标签: apache-kafka apache-flink kafka-consumer-api flink-streaming