【发布时间】:2018-12-18 02:37:13
【问题描述】:
我正在尝试使用 Kafka、Spark Streaming 和 Python 编写生产者和消费者的代码;情况如下: 有一个 Json 格式的与里程计有关的随机消息的生产者,它使用线程每 3 秒在一个主题上发布消息:
from kafka import KafkaProducer
from kafka.errors import KafkaError import threading
from random import randint import random
import json
import math
def sendMessage():
#the function is called every 3 seconds, then a message is sent every 3 seconds
threading.Timer(3.0, sendMessage).start()
#connection with message broker
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
#the id is initially fixed to 1, but there could be more robots
robotId = 1
#generation of random int
deltaSpace = randint(1, 9) #.encode()
thetaTwist = random.uniform(0, math.pi*2) #.encode()
future = producer.send('odometry', key=b'message', value={'robotId': robotId, 'deltaSpace': deltaSpace, 'thetaTwist': thetaTwist}).add_callback(on_send_success).add_errback(on_send_error)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
producer.flush()
def on_send_success(record_metadata):
print ("topic name: " + record_metadata.topic)
print ("number of partitions: " + str(record_metadata.partition))
print ("offset: " + str(record_metadata.offset))
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
sendMessage()
然后有一个Consumer,每3秒对同一主题消费一次消息,并用Spark Streaming进行处理;这是代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
# Create a local StreamingContext with two working thread and batch interval of 3 second
sc = SparkContext("local[2]", "OdometryConsumer")
ssc = StreamingContext(sc, 3)
kafkaStream = KafkaUtils.createDirectStream(ssc, ['odometry'], {'metadata.broker.list': 'localhost:9092'})
parsed = kafkaStream.map(lambda v: json.loads(v))
def f(x):
print(x)
fore = parsed.foreachRDD(f)
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
为了运行应用程序,我在端口 2181 上启动了 zookeeper 服务器
sudo /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
然后我在端口 9092 上启动 Kafka 的服务器/代理
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
然后我启动生产者和消费者
python3 Producer.py
./spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.3.1.jar /home/erca/Scrivania/proveTesi/SparkConsumer.py
应用程序运行没有错误,但我不确定消息是否真的被消费了;我能做些什么来验证呢?感谢所有帮助我的人!
【问题讨论】:
-
在你的函数
f中,改写x.take(10)。 -
为了澄清上面的评论,打印一个 RDD 并没有调用任何操作,所以 Spark 没有做任何事情。此外,一个普通的 Kafka 生产者/消费者会更容易测试
标签: python apache-spark stream apache-kafka spark-streaming