【问题标题】:python send csv data to spark streamingpython将csv数据发送到火花流
【发布时间】:2016-10-06 16:07:20
【问题描述】:

我想尝试在 python 中加载 csv 数据并通过 SPark Streaming 流式传输每一行火花。

我对网络东西很陌生。我不完全是我是否应该创建一个服务器 python 脚本,一旦它建立连接(使用火花流),它将开始发送每一行。在 Spark Streaming 文档中,他们执行 nc -l 9999 如果我正确,这是一个在端口 9999 上侦听的 netcat 服务器。所以我尝试创建一个类似的 python 脚本来解析 csv 并在端口 60000 上发送

import socket                   # Import socket module
import csv

 port = 60000                    # Reserve a port for your service.
 s = socket.socket()             # Create a socket object
 host = socket.gethostname()     # Get local machine name
 s.bind((host, port))            # Bind to the port
 s.listen(5)                     # Now wait for client connection.

 print('Server listening....')

 while True:
     conn, addr = s.accept()     # Establish connection with client.
     print('Got connection from', addr)



     csvfile = open('Titantic.csv', 'rb')

     reader = csv.reader(csvfile, delimiter = ',')
     for row in reader:
         line = ','.join(row)

         conn.send(line)
         print(line)

     csvfile.close()

     print('Done sending')
     conn.send('Thank you for connecting')
     conn.close()

Spark 流媒体脚本 -

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines_RDD = ssc.socketTextStream("localhost", 60000)

# Split each line into words
data_RDD = lines_RDD.flatMap(lambda line: line.split(","))

data_RDD.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

运行 spark 脚本时(顺便说一句,这是在 Jupyter Notebooks 中)我收到此错误 - IllegalArgumentException: '要求失败:没有注册输出操作,所以没有执行'

我不认为我正在正确地执行我的套接字脚本,但我不太确定该怎么做我基本上试图复制 nc -lk 9999 所做的事情,以便我可以通过端口发送文本数据,然后 spark 流正在监听它并接收数据并处理它。

任何帮助将不胜感激

【问题讨论】:

    标签: python sockets apache-spark streaming


    【解决方案1】:

    我正在尝试做类似的事情,但我想每 10 秒播放一次。我用这个脚本解决了:

    import socket
    from time import sleep
    
    host = 'localhost'
    port = 12345
    
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(1)
    while True:
        print('\nListening for a client at',host , port)
        conn, addr = s.accept()
        print('\nConnected by', addr)
        try:
            print('\nReading file...\n')
            with open('iris_test.csv') as f:
                for line in f:
                    out = line.encode('utf-8')
                    print('Sending line',line)
                    conn.send(out)
                    sleep(10)
                print('End Of Stream.')
        except socket.error:
            print ('Error Occured.\n\nClient disconnected.\n')
    conn.close()
    

    希望这会有所帮助。

    【讨论】:

      猜你喜欢
      • 2016-12-16
      • 1970-01-01
      • 1970-01-01
      • 2022-01-23
      • 2018-07-19
      • 1970-01-01
      • 2018-01-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多