【问题标题】:Log4j not working in spark streaming's foreachRDD methodLog4j 在火花流的 foreachRDD 方法中不起作用
【发布时间】:2019-01-09 07:26:24
【问题描述】:

我在使用spark2.4的sparkstreaming消费kafka时,发现我的foreachRDD方法外的日志打印出来了,而foreachRDD里面的日志却没有打印出来。我使用的日志api是log4j,版本是1.2。

我已尝试添加
spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties
spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties

到spark-defaults.properties配置文件,一开始我在打印日志级别和日志配置文件路径错误信息的时候写错了路径 于是 spark.executor.extraJavaOptions 和 spark.driver.extraJavaOptions 配置生效了。

【问题讨论】:

    标签: apache-spark log4j


    【解决方案1】:

    日志内外foreach块在不同的机器上执行,一个在驱动程序上,一个在执行程序上。因此,如果您想查看 foreach 块内的日志,可以访问 yarn 以获取更多日志。

    【讨论】:

    • 好像我错了,你可以显示日志的截图(细节和纱线)吗?我认为这将有助于我们找到原因。
    【解决方案2】:
    <code>
        SLF4J: Class path contains multiple SLF4J bindings.
        SLF4J: Found binding in [jar:file:/vdir/mnt/disk2/hadoop/yarn/local/usercache/root/filecache/494/__spark_libs__3795396964941241866.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
        SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
        SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
        SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
        19/01/10 14:17:16 ERROR KafkaSparkStreamingKafkaTests: receive+++++++++++++++++++++++++++++++
    </code>
    
        My code:
    <code>
        1.if (args[3].equals("consumer1")) {
                    logger.error("receive+++++++++++++++++++++++++++++++");
                    SparkSQLService sparkSQLService = new SparkSQLService();
                    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer1");
                    sparkSQLService.sparkForwardedToKafka(sparkConf,
                            CONSUMER_TOPIC,
                            PRODUCER_TOPIC,
                            new HashMap<String, Object>((Map) consumerProperties));
        ......
        2.public void sparkForwardedToKafka(SparkConf sparkConf, String consumerTopic, String producerTopic, Map<String, Object> kafkaConsumerParamsMap) {
                sparkConf.registerKryoClasses(new Class[]{SparkSQLService.class, FlatMapFunction.class, JavaPairInputDStream.class, Logger.class});
                JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.milliseconds(DURATION_SECONDS));
                Collection<String> topics = Arrays.asList(consumerTopic);
                JavaInputDStream<ConsumerRecord<String, String>> streams =
                        KafkaUtils.createDirectStream(
                                javaStreamingContext,
                                LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.Subscribe(topics, kafkaConsumerParamsMap)
                        );
                if (producerTopic != null) {
                    JavaPairDStream<Long, String> messages = streams.mapToPair(record -> new Tuple2<>(record.timestamp(), record.value()));
         messages.foreachRDD(rdd ->
                            {
                                rdd.foreachPartition(partition -> {
                                    partition.forEachRemaining(tuple2 -> {
                                        LOGGER.error("****"+tuple2._1+"|"+tuple2._2);
                                        KafkaService.getInstance().send(producerTopic, TaskContext.get().partitionId(), tuple2._1, null, tuple2._2);
                                    });
                                });
                            }
                    );
    
    </code>
    

    我的记录器声明: private static final Logger LOGGER = LoggerFactory.getLogger(SparkSQLService.class);

    【讨论】:

      猜你喜欢
      • 2017-01-29
      • 1970-01-01
      • 2019-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多