【问题标题】:How to create multiple producer and consumer in kafka using python如何使用python在kafka中创建多个生产者和消费者
【发布时间】:2019-07-22 18:59:08
【问题描述】:

我正在尝试使用烧瓶应用程序中的 rtsp 链接从多个网络摄像头捕获流,并希望通过浏览器显示。为了实现这一点,我创建了两个单独的生产者,主题和两个消费者。启动 kafka 服务器并同时运行 consumer.py 和 producer.py 后,两个流仅运行两秒钟。

我想从多个网络摄像头捕获流

producer.py

import time
import sys
import cv2
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer2 = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'my-topic'
topic2 = 'my-topic2'


def emit_video():
    print('start emitting')
    camera = cv2.VideoCapture('rtsp://webcam1')
    camera2 = cv2.VideoCapture('rtsp://webcam2')

    while True:
        success, frame = camera.read()
        success2, frame2 = camera2.read()
        if not success2:
            print("camera issue")
        # png might be too large to emit
        else:
            data = cv2.imencode('.jpeg', frame)[1].tobytes()
            data2 = cv2.imencode('.jpeg', frame2)[1].tobytes()

            future = producer.send(topic, data)
            future2 = producer2.send(topic2, data2)
            try:
                future.get(timeout=60)
                future2.get(timeout=60)
            except KafkaError as e:
                print(e)
                break

            print('.', end='', flush=True)
            # to reduce CPU usage
            # time.sleep(0.2)
        # print()
    # video.release()

    print('done')


emit_video()

这是我的 consumer.py

from flask import Flask, Response,render_template
from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic', bootstrap_servers='localhost:9092')
consumer2 = KafkaConsumer('my-topic2', bootstrap_servers='localhost:9092')

app = Flask(__name__)


def kafkastream():
    for message in consumer:
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + message.value + b'\r\n\r\n')


def kafkastream2():
    print(threading.current_thread().getName())
    for message in consumer2:
        yield (b'--frame\r\n'
               b'Content-Type: image/jpeg\r\n\r\n' + message.value + b'\r\n\r\n')


@app.route('/video_feed')
def video_feed():
    return Response(kafkastream(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route('/video_feed2')
def video_feed2():
    return Response(kafkastream2(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route('/')
def index():
    return render_template('index.html')

if __name__ == '__main__':
    app.run(debug=True)

【问题讨论】:

  • 两分钟后停止工作会发生什么?如果future.get() 调用超时,我想知道你为什么要打这个电话,以便提出更好的替代方案。
  • 第一个摄像头流式传输仅运行两秒钟,然后两秒钟后,第二个摄像头才启动。之后,制片人。 py 显示“文件”src/producer. py",第 29 行,在 emit_video 数据 = cv2.imencode('.jpeg', frame)[1].tobytes() cv2.error: OpenCV(4.0.0) /io/opencv/modules/imgcodecs/src/grfmt_base .cpp:145: error: (-10:Unknown error code -10) Raw image encoder error: Empty JPEG image (DNL not supported) in function 'throwOnEror'"
  • 我可以使用同一个端口 9092 使用 consumer 和 consumer2 吗?
  • @ArifIbrahim 9092 是引导服务器的端口。让超过 1 个消费者连接到同一个引导服务器绝对没问题。
  • @GiorgosMyrianthous 非常感谢。现在我确认我的代码没问题。该问题是为使用 RTSP 链接而创建的。没有 rtsp 代码运行良好。

标签: python opencv flask apache-kafka


【解决方案1】:
data = cv2.imencode('.jpeg', frame)[1].tobytes()

我的示例代码

ret, buffer = cv2.imencode('.jpg', frame)

producer.send(topic, buffer.tobytes())


delete '[1]'

【讨论】:

    猜你喜欢
    • 2023-01-09
    • 1970-01-01
    • 2019-12-31
    • 1970-01-01
    • 1970-01-01
    • 2011-11-08
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    相关资源
    最近更新 更多