【问题标题】:Why Camel Kafka-Rabbitmq connector converts my messages to unreadable format?为什么 Camel Kafka-Rabbitmq 连接器会将我的消息转换为不可读的格式?
【发布时间】:2021-01-11 04:12:29
【问题描述】:

我的目标是在 rabbitmq 交换队列和 kafka 主题之间建立一个连接器。

我按照本指南设置连接器:https://camel.apache.org/camel-kafka-connector/latest/try-it-out-locally.html。我从源代码下载并安装了连接器:https://github.com/apache/camel-kafka-connector,构建它并为camel-rabbitmq-kafka-connector 解压缩文件。我还将plugin.path 指向我在connect-standalone.properties 中解压缩camel-rabbitmq-kafka-connector jar 的文件夹。

我用于CamelRabbitSourceConnector 的参数如下:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into
topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.path.exchangeName=myexchange
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue

我的 rabbitmq 的 docker run 命令如下所示: docker run --rm -it --hostname myhostname -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:3-management。对于 kafka,我使用了标准的“入门”指南。

使用 python Pika 库发送消息:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='myqueue',durable=True,auto_delete=True)
channel.basic_publish(exchange='', routing_key='myqueue', body='some body...')

如您所见,我发送消息时未在 channel.basic_publish 函数中指定 exchange 参数。如果我将它设置为等于camel.source.path.exchangeName,那么我的消息会在两者之间的某个地方丢失,所以也许我在这里遗漏了一些东西。

【问题讨论】:

  • 您是否尝试过在 RabbitMQ 中将myqueue 显式绑定到myexchange
  • 我不知道如何显式绑定它们。但是查看管理控制台,我看到 myexchangedirect 类型,并且路由键与队列名称匹配,即 myqueue

标签: apache-kafka rabbitmq apache-camel apache-kafka-connect


【解决方案1】:

我能够通过将我的客户端更改为 Java:https://www.rabbitmq.com/tutorials/tutorial-one-java.html 而不是 python 一来解决此问题。

【讨论】:

  • 理解 python 的不同之处仍然很有趣;你能提供整个可运行的python源吗?
  • 我粘贴了我的 python 客户端的完整示例。我从与 Java 相同的指南中获取它:rabbitmq.com/tutorials/tutorial-one-python.html
【解决方案2】:

我能够使用以下属性使其工作:

name=CamelRabbitmqSourceConnector
connector.class=org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector
tasks.max=1

# use the kafka converters that better suit your needs, these are just defaults:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# comma separated topics to send messages into
topics=mytopic

# mandatory properties (for a complete properties list see the connector documentation):

# The exchange name determines the exchange to which the produced messages will be sent to. In the case of consumers, the exchange name determines the exchange the queue will be bound to.
camel.source.endpoint.hostname=myhostname
camel.source.endpoint.addresses=localhost:5672
camel.source.endpoint.queue=myqueue
camel.source.endpoint.autoDelete=false
camel.source.endpoint.skipExchangeDeclare=true
camel.source.endpoint.skipQueueBind=true

【讨论】:

    猜你喜欢
    • 2021-11-29
    • 1970-01-01
    • 2023-03-14
    • 2011-02-24
    • 1970-01-01
    • 2022-12-15
    • 2018-06-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多