【问题标题】:SparkStreaming, RabbitMQ and MQTT in python using pika使用 pika 在 python 中的 SparkStreaming、RabbitMQ 和 MQTT
【发布时间】:2016-10-18 05:49:53
【问题描述】:

为了让事情变得棘手,我想使用来自 rabbitMQ 队列的消息。现在我知道rabbit上有一个MQTT插件(https://www.rabbitmq.com/mqtt.html)。

但是,我似乎无法举例说明 Spark 使用 pika 生成的消息。

例如我在这里使用简单的 wordcount.py 程序 (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) 来查看是否可以通过以下方式看到消息 producer

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

火花流消费者如下:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

但与简单的字数统计示例不同,我无法使其正常工作并出现以下错误:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

所以我的问题是,MQTTUtils.createStream(ssc, brokerUrl, topic) 的设置应该是什么以监听队列,是否有更完整的示例以及这些示例如何映射到 rabbitMQ 的示例。

我正在运行我的消费者代码:./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

我已按照一条评论的建议使用 TCP 参数更新了生产者代码如下:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

火花流为:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()

【问题讨论】:

    标签: python apache-spark rabbitmq mqtt pika


    【解决方案1】:

    MqttAsyncClient Javadoc,服务器 URI 必须具有以下方案之一:tcp://ssl://local://。您需要将上面的brokerUrl 更改为其中一种方案。

    更多信息,这里是MqttAsyncClient的来源链接:

    https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272

    【讨论】:

    • 我试图将生产者更改为使用 tcp 而不是 http,但是我发现我现在遇到了以下连接问题: ERROR ReceiverSupervisorImpl: Stopped receiver with error: Connection lost (32109) - java .net.SocketException:连接重置
    【解决方案2】:

    您似乎使用了错误的端口号。假设:

    • 您有一个使用默认设置运行的本地 RabbitMQ 实例,并且您已启用 MQTT 插件 (rabbitmq-plugins enable rabbitmq_mqtt) 并重新启动了 RabbitMQ 服务器
    • 在执行spark-submit / pyspark 时包含spark-streaming-mqtt(使用packagesjars / driver-class-path

    您可以使用 TCP 与tcp://localhost:1883 连接。您还必须记住 MQTT 使用的是amq.topic

    快速入门

    • 使用以下内容创建Dockerfile

      FROM rabbitmq:3-management
      
      RUN rabbitmq-plugins enable rabbitmq_mqtt
      
    • 构建 Docker 镜像:

      docker build -t rabbit_mqtt .
      
    • 启动镜像并等待服务器准备好:

      docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
      
    • 使用以下内容创建producer.py

      import pika
      import time 
      
      
      connection = pika.BlockingConnection(pika.ConnectionParameters(
          host='localhost'))
      channel = connection.channel()
      channel.exchange_declare(exchange='amq.topic',
                       type='topic', durable=True)
      
      for i in range(1000):
          channel.basic_publish(
              exchange='amq.topic',  # amq.topic as exchange
              routing_key='hello',   # Routing key used by producer
              body='Hello World {0}'.format(i)
          )
          time.sleep(3)
      
      connection.close()
      
    • 开始生产者

      python producer.py
      

      并访问管理控制台http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

      查看收到的消息。

    • 使用以下内容创建consumer.py

      from pyspark import SparkContext
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.mqtt import MQTTUtils
      
      sc = SparkContext()
      ssc = StreamingContext(sc, 10)
      
      mqttStream = MQTTUtils.createStream(
          ssc, 
          "tcp://localhost:1883",  # Note both port number and protocol
          "hello"                  # The same routing key as used by producer
      )
      mqttStream.count().pprint()
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
      
    • 下载依赖项(将 Scala 版本调整为用于构建 Spark 和 Spark 版本的版本):

      mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
      
    • 确保SPARK_HOMEPYTHONPATH 指向正确的目录。

    • 提交consumer.py with(如前调整版本):

      spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
      

    如果您按照所有步骤操作,您应该会在 Spark 日志中看到 Hello world 消息。

    【讨论】:

    • 谢谢。我会看看。这可以直接和主题​​一起使用吗?
    • MQTT 插件can be configured 使用不同的交换,但据我所知。无论如何,MQTT 协议并没有比这丰富多少。
    • 有没有办法在没有 docker 的情况下进行配置 - 例如使用 .config 文件。我已尝试使用rabbitmq.com/mqtt.html 中的默认设置。但这根本不起作用。在没有设置的情况下,我的 spark 侦听器可以连接以下内容:=INFO REPORT==== 5-Jul-2016::11:52:08 === 接受 MQTT 连接 (127.0.0.1:47868 - > 127.0.0.1:1883)。但是如何让产生的消息映射到这个端口上呢?
    • Docker 在这里并不重要,但我不太明白这个问题。端口不是消息的属性。 I 它是服务器的全局属性。如果主题和交流匹配,则不应有任何问题的理由。 “它不起作用”是什么意思?当您检查 RabbitMQ UI 时,您是否看到来自生产者的绑定?消费者呢?路由键匹配吗?
    • 我曾尝试使用我们使用的标准消息队列。我现在尝试使用没有队列的主题,这似乎开箱即用。但是没有使用 Docker。
    猜你喜欢
    • 1970-01-01
    • 2014-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-21
    • 2014-12-16
    • 2018-11-11
    相关资源
    最近更新 更多