【问题标题】:How to suppress stdout 'batch' when streaming spark?流式传输火花时如何抑制标准输出“批处理”?
【发布时间】:2020-11-16 14:47:56
【问题描述】:

如何更改或完全禁止此批次元数据并仅显示我的东西?

-------------------------------------------
Batch: 62
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [32]|transaction|        0|335793|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------
Batch: 63
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value|      topic|partition|offset|           timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [33]|transaction|        0|335794|2020-07-27 15:10:...|            0|
+----+-----+-----------+---------+------+--------------------+-------------+

-------------------------------------------

spark_job.py 代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
KAFKA_TOPIC_NAME_CONS = "transaction"
KAFKA_BOOTSTRAP_SERVERS_CONS = '127.0.0.1:9092'
project_path = 'C:/Users/Admin/Desktop/kafka_project'

if __name__ == "__main__":
    print("PySpark Structured Streaming with Kafka Demo Application Started ...")

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka Demo") \
        .master("local[*]") \
        .config("spark.jars",                    "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.executor.extraLibrary",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .config("spark.driver.extraClassPath",   "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    incoming_read = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS)\
        .option("startingOffsets", "earliest")\
        .load()

    query1 = incoming_read.writeStream.format("console").start()
    time.sleep(4)


    query1.awaitTermination()
    incoming_read.awaitTermination()
    
    
    print("PySpark demo app finished.")

producer.py 以 4 秒的间隔永远发送数字 0 到 7:

# coding=utf8
from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8')
                         )

topic = 'transaction'

while True:
    print('restarting the loop...')
    for i in range(7):
        print('producing for this topic %s this blob: %s ' % (topic,i))

        producer.send(topic, value=i)
        sleep(1)

另外,如何实际看到最后一行“pyspark demo app finished”?

我是否需要停止生产者并在 spark.py 超时时等待? 使用 spark2.4.6,python3.7

【问题讨论】:

  • 嗨@ERJAN,以下答案是否回答了您的问题?

标签: apache-spark pyspark apache-kafka spark-streaming-kafka


【解决方案1】:

查看ConsoleWrite 类的code,在使用控制台输出时无法抑制或更改“批处理”输出,因为它似乎是硬编码的。:

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    // We have to print a "Batch" label for the epoch for compatibility 
    // with the pre-data source V2 behavior.
    printRows(messages, schema, s"Batch: $epochId")
  }

  protected def printRows(
      commitMessages: Array[WriterCommitMessage],
      schema: StructType,
      printMessage: String): Unit = {
[...]
    // scalastyle:off println
    println("-------------------------------------------")
    println(printMessage)
    println("-------------------------------------------")
    // scalastyle:off println
[...]
  }

除非有办法抑制println 语句。

另外,如何实际看到最后一行“pyspark demo app finished”?

您需要通过发送 SIGTERM 信号来终止您的应用程序,或者直接终止您提交的应用程序。不确定您是否能够在代码的最后一行看到您的打印语句。制作人与此无关。

【讨论】:

    猜你喜欢
    • 2016-06-25
    • 2011-08-15
    • 1970-01-01
    • 1970-01-01
    • 2017-09-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多