【问题标题】:How to create python kafka producer with multiple bootstrap servers?如何使用多个引导服务器创建 python kafka 生产者?
【发布时间】:2021-09-03 13:13:08
【问题描述】:

我正在尝试使用向多个代理支持的 Kafka 主题发送消息。下面是我的方法,其中self.bootstrap_servers = "[b-1.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092, b-2.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092]"

from kafka import KafkaProducer
from time import sleep
    def send(self):
        try:
            producer = KafkaProducer(bootstrap_servers=self.bootstrap_server, api_version=(0, 10, 1))
            message = self.prepare_message()
            producer.send(self.topic, self.encoded_payload)
            sleep(2)
        except:
            print("Exception sending message to kafka")

当我尝试对此进行测试时,我进入了异常块,失败的那一行是我创建生产者的时候。如何使用这个 KafkaProducer 发布 Kafka 消息,它有多个支持它的 kafka bootstrap_servers?有没有其他方法可以做到这一点?

错误:

  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\client_async.py", line 216, in __init__
    self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1236, in collect_hosts
    host, port, afi = get_ip_port_afi(host_port)
  File "C:\Users\ENV\datareplayer\venv\lib\site-packages\kafka\conn.py", line 1192, in get_ip_port_afi
    host, rest = host_and_port_str[1:].split(']')
ValueError: not enough values to unpack (expected 2, got 1)

【问题讨论】:

  • 错误是什么?
  • 你为什么认为你需要方括号?这不在任何示例中
  • 我找到的例子都是方括号。它应该只是 bootstrap_server 的字符串吗?

标签: python apache-kafka


【解决方案1】:

关于向具有多个brokers的kafka集群发送数据, 引导服务器只是初始连接点,它从引导服务器元数据中获取数据领导者所在的位置并连接到适当的代理......因此您需要与集群中的所有代理建立网络连接,这并不重要选择哪个代理作为引导服务器

话虽这么说...推荐的方法是使用逗号分隔的列表作为引导服务器,因此如果一个关闭,它将使用列表中的下一个...

关于在python kafka的设置中指定引导服务器列表

您使用的是哪些 python kafka 库?

这件事似乎还有一个未解决的未解决问题。

https://github.com/confluentinc/confluent-kafka-python/issues/711

说明 配置键 bootstrap.servers 意味着可以指定多个服务器,但没有明确定义该键的值应该是什么。此配置键似乎只允许使用逗号分隔的字符串。如果给出了 Python 列表,则不会给出明确的错误或反馈来表明这是无效的。相反,对 consumer.poll() 或其他代理绑定函数的调用只会阻塞并且永远不会成功。

理想情况下,Python 列表(例如 ["broker1", "broker2"])应该是有效的;或者至少,构造消费者应该抛出运行时/配置异常。

【讨论】:

  • OP 似乎在使用kafka-python
  • 是的,我正在使用kafka-python
  • 尝试去掉列表元素之间的空格再检查
  • self.bootstrap_servers = ['b-1.dev-stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092', 'b-2.dev- stg-kafka.wjiw3s.c1.kafka.us-east-1.amazonaws.com:9092']
【解决方案2】:

ast.literal_eval(bootstrap_servers) 可以帮忙。

它将字符串化列表作为输入,并将其转换为列表。例如:

input_str = "['broker1', 'broker2', 'broker3']"

type(input_str)
str

如果你这样做

input_list = ast.literal_eval(input_str)

input_list 现在是一个列表

type(input_list)
list

【讨论】:

  • 正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center
猜你喜欢
  • 2019-07-22
  • 1970-01-01
  • 2014-02-18
  • 2020-03-30
  • 2020-05-28
  • 1970-01-01
  • 2023-01-09
  • 2020-06-19
  • 2021-11-14
相关资源
最近更新 更多