【发布时间】:2020-11-16 14:47:56
【问题描述】:
如何更改或完全禁止此批次元数据并仅显示我的东西?
-------------------------------------------
Batch: 62
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value| topic|partition|offset| timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [32]|transaction| 0|335793|2020-07-27 15:10:...| 0|
+----+-----+-----------+---------+------+--------------------+-------------+
-------------------------------------------
Batch: 63
-------------------------------------------
+----+-----+-----------+---------+------+--------------------+-------------+
| key|value| topic|partition|offset| timestamp|timestampType|
+----+-----+-----------+---------+------+--------------------+-------------+
|null| [33]|transaction| 0|335794|2020-07-27 15:10:...| 0|
+----+-----+-----------+---------+------+--------------------+-------------+
-------------------------------------------
spark_job.py 代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import time
KAFKA_TOPIC_NAME_CONS = "transaction"
KAFKA_BOOTSTRAP_SERVERS_CONS = '127.0.0.1:9092'
project_path = 'C:/Users/Admin/Desktop/kafka_project'
if __name__ == "__main__":
print("PySpark Structured Streaming with Kafka Demo Application Started ...")
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka Demo") \
.master("local[*]") \
.config("spark.jars", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.executor.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.executor.extraLibrary", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.config("spark.driver.extraClassPath", "file:///C://Users//Admin//Desktop//kafka_project//spark-sql-kafka-0-10_2.11-2.4.6.jar,file:///C://Users//Admin//Desktop//kafka_project//kafka-clients-2.4.1.jar") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
incoming_read = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option("subscribe", KAFKA_TOPIC_NAME_CONS)\
.option("startingOffsets", "earliest")\
.load()
query1 = incoming_read.writeStream.format("console").start()
time.sleep(4)
query1.awaitTermination()
incoming_read.awaitTermination()
print("PySpark demo app finished.")
producer.py 以 4 秒的间隔永远发送数字 0 到 7:
# coding=utf8
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8')
)
topic = 'transaction'
while True:
print('restarting the loop...')
for i in range(7):
print('producing for this topic %s this blob: %s ' % (topic,i))
producer.send(topic, value=i)
sleep(1)
另外,如何实际看到最后一行“pyspark demo app finished”?
我是否需要停止生产者并在 spark.py 超时时等待? 使用 spark2.4.6,python3.7
【问题讨论】:
-
嗨@ERJAN,以下答案是否回答了您的问题?
标签: apache-spark pyspark apache-kafka spark-streaming-kafka