【发布时间】:2019-04-13 09:26:33
【问题描述】:
我正在学习使用 Databricks 进行结构化流处理,但我正在努力使用 DataStreamWriter 控制台模式。
我的程序:
- 模拟文件流式到达文件夹“monitoring_dir”(每 10 秒从“source_dir”传输一个新文件)。
- 使用 DataStreamReader 使用每个新文件的内容填充 Unbounded DataFrame“inputUDF”。
- 使用 DataStreamWriter 将“inputUDF”的新行输出到有效的接收器。
虽然程序在选择使用文件接收器时有效(批处理附加到“result_dir”中的文本格式文件),但在选择控制台接收器时我看不到任何显示。
此外,当我在本地机器(安装了 Spark)上运行该程序的等效版本时,它适用于文件和控制台接收器。
我的问题是:
- 如何让这个程序在使用 Databricks 时输出到 Console sink 并显示结果?
提前非常感谢您!
最好的问候, 纳乔
我的程序:myTest.py
>import pyspark
import pyspark.sql.functions
import time
#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------
def get_source_dir_file_names(source_dir):
# 1. We create the output variable
res = []
# 2. We get the FileInfo representation of the files of source_dir
fileInfo_objects = dbutils.fs.ls(source_dir)
# 3. We traverse the fileInfo objects, to get the name of each file
for item in fileInfo_objects:
# 3.1. We get a string representation of the fileInfo
file_name = str(item)
# 3.2. We look for the pattern name= to remove all useless info from the start
lb_index = file_name.index("name='")
file_name = file_name[(lb_index + 6):]
# 3.3. We look for the pattern ') to remove all useless info from the end
ub_index = file_name.index("',")
file_name = file_name[:ub_index]
# 3.4. We append the name to the list
res.append(file_name)
# 4. We sort the list in alphabetic order
res.sort()
# 5. We return res
return res
#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
# 1. We get the names of the files on source_dir
files = get_source_dir_file_names(source_dir)
# 2. We get the starting time of the process
time.sleep(time_step_interval * 0.1)
start = time.time()
# 3. We set a counter in the amount of files being transferred
count = 0
# 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
# (i.e, the files are moved one by one for each time period, simulating their generation).
for file in files:
# 4.1. We copy the file from source_dir to dataset_dir#
dbutils.fs.cp(source_dir + file, monitoring_dir + file)
# 4.2. We increase the counter, as we have transferred a new file
count = count + 1
# 4.3. We wait the desired transfer_interval until next time slot.
time.sleep((start + (count * time_step_interval)) - time.time())
# 5. We wait a last time_step_interval
time.sleep(time_step_interval)
#------------------------------------
# FUNCTION my_main
#------------------------------------
def my_main():
# 0. We set the mode
console_sink = True
# 1. We set the paths to the folders
source_dir = "/FileStore/tables/my_dataset/"
monitoring_dir = "/FileStore/tables/my_monitoring/"
checkpoint_dir = "/FileStore/tables/my_checkpoint/"
result_dir = "/FileStore/tables/my_result/"
dbutils.fs.rm(monitoring_dir, True)
dbutils.fs.rm(result_dir, True)
dbutils.fs.rm(checkpoint_dir, True)
dbutils.fs.mkdirs(monitoring_dir)
dbutils.fs.mkdirs(result_dir)
dbutils.fs.mkdirs(checkpoint_dir)
# 2. We configure the Spark Session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
inputUDF = spark.readStream.format("text")\
.load(monitoring_dir)
myDSW = None
# 4. Operation A1: We create the DataStreamWritter...
# 4.1. To either save to result_dir in append mode
if console_sink == False:
myDSW = inputUDF.writeStream.format("text")\
.option("path", result_dir) \
.option("checkpointLocation", checkpoint_dir)\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 4.2. Or to display by console in append mode
else:
myDSW = inputUDF.writeStream.format("console")\
.trigger(processingTime="10 seconds")\
.outputMode("append")
# 5. We get the StreamingQuery object derived from starting the DataStreamWriter
mySQ = myDSW.start()
# 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
streaming_simulation(source_dir, monitoring_dir, 10)
# 7. We stop the StreamingQuery to finish the application
mySQ.stop()
#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
my_main()
我的数据集:f1.txt
第一句话。
第二句。
我的数据集:f2.txt
第三句。
第四句。
我的数据集:f3.txt
第五句。
第六句。
【问题讨论】:
标签: python spark-streaming databricks spark-structured-streaming