【问题标题】:How to perform logging in Flink jobs?如何在 Flink 作业中执行日志记录?
【发布时间】:2022-01-20 16:04:33
【问题描述】:

我正在研究一个非常简单的用例,我想检查DataStream 中的数据。我想了解是否有更好的日志记录方式。因为下面的日志记录方式看起来很丑陋,并且增加了一个额外的阶段。

DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
        stream.map(conversation -> {
            logger.info("Read from Kafka source conversationId: {} and content: {}",conversation.id,conversation.time);
            return conversation;
        });

【问题讨论】:

    标签: logging apache-flink


    【解决方案1】:

    或许你可以实现一个mapfunction类,然后在类中打印日志

    DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
                stream.map(new MyMapFunction()
                });
    
    public class MyMapFunction extends RichMapFunction<T> {
        @Override
        public void open(Configuration parameters) throws Exception {
        }
    
        @Override
        public T map(...) throws Exception {
            logger.info(xxxx);
            return xxx;
        }
    }
    

    你也可以直接使用printsink

    DataStream<Conversation> stream = env.addSource(kafkaConsumer.getKafkaConsumer());
            stream.print()
    

    【讨论】:

    • 在map里面登录和使用MapFunction有什么区别?
    • 打印流打印到控制台不是吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-27
    • 2013-08-01
    • 1970-01-01
    相关资源
    最近更新 更多