【发布时间】:2020-03-15 04:43:17
【问题描述】:
我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在A 类型的每条消息之后,我立即收到B... 类型之一,最终收到C 类型:
A [INFO] 2019-11-19T15:17:19.603Z <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR] 2019-11-19T15:17:19.605Z <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后又立即关闭它?
编辑
主题已经存在,
kafka-topics.sh --list显示它。我用过的所有客户端都有同样的问题:Kafka 的
kafka-console-producer.sh、kafka-python、confluent-kafka 和 kafkacatKafka 集群与我的所有其他计算机位于同一 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。
但是,它由 Amazon 的 Kafka 托管流 (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。 MSK 只是发布 zookeeper 和消息代理 URL 供客户端使用。
生产者作为 AWS Lambda 函数运行,但当我在普通 EC2 实例上运行时问题仍然存在。
权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少的权限)。
连接不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向 zookeepers 发出命令是有效的,而向消息代理发出命令总是最终失败。由于Kafka uses a binary protocol over TCP,我不知道如何进一步调试问题。
编辑
按照建议,我用
进行了调试./kafkacat -b $BROKERS -L -d 经纪人
得到:
7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN
那么,这是客户端和代理 API 版本之间的一种不匹配吗?记住我没有控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?
【问题讨论】:
-
您的目标主题是否已经创建?如果您也可以分享您的代码和配置,那将会很有帮助。
-
@GiorgosMyrianthous 不,不是,但有和没有
auto.create.topics.enable的问题是一样的。 -
尝试在运行您的生产者之前创建主题。它应该可以工作。
-
@GiorgosMyrianthous 我现在尝试独立地预先填充主题。问题是一样的。
-
只检查“ufw status”,如果您在 status 中获得任何内容,只需禁用 ufw 然后尝试相同。
标签: python amazon-web-services apache-kafka kafka-producer-api amazon-msk