【发布时间】:2021-01-11 04:12:29
【问题描述】:
我的目标是在 rabbitmq 交换队列和 kafka 主题之间建立一个连接器。
我按照本指南设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html。我从源代码下载并安装了连接器:https://github.com/apache/camel-kafka-connector,构建它并为camel-rabbitmq-kafka-connector 解压缩文件。我还将plugin.path 指向我在connect-standalone.properties 中解压缩camel-rabbitmq-kafka-connector jar 的文件夹。
我用于CamelRabbitSourceConnector 的参数如下:
name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1
# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# comma separated topics to send messages into
topics=mytopic
# mandatory properties (for a complete properties list see the connector documentation):
# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
我的 rabbitmq 的 docker run 命令如下所示:
docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management。对于 kafka,我使用了标准的“入门”指南。
使用 python Pika 库发送消息:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')
如您所见,我发送消息时未在 channel.basic_publish 函数中指定 exchange 参数。如果我将它设置为等于camel.source.path.exchangeName,那么我的消息会在两者之间的某个地方丢失,所以也许我在这里遗漏了一些东西。
【问题讨论】:
-
您是否尝试过在 RabbitMQ 中将
myqueue显式绑定到myexchange? -
我不知道如何显式绑定它们。但是查看管理控制台,我看到
myexchange是direct类型,并且路由键与队列名称匹配,即myqueue。
标签: apache-kafka rabbitmq apache-camel apache-kafka-connect