【问题标题】:Can talk to Zookeeper but not to the message brokers可以与 Zookeeper 对话,但不能与消息代理对话
【发布时间】:2020-03-15 04:43:17
【问题描述】:

我正在使用 kafka-python 为 Kafka 2.2.1 集群(来自 AWS 的 MSK 服务的托管集群实例)生成消息。我能够检索引导服务器并与它们建立网络连接,但没有消息通过。相反,在A 类型的每条消息之后,我立即收到B... 类型之一,最终收到C 类型:

A [INFO]    2019-11-19T15:17:19.603Z    <BrokerConnection ... <connecting> [IPv4 ('10.0.128.56', 9094)]>: Connection complete.
B [ERROR]   2019-11-19T15:17:19.605Z    <BrokerConnection ... <connected> [IPv4 ('10.0.128.56', 9094)]>: socket disconnected
C [ERROR] KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

是什么导致代理节点接受来自有希望的生产者的 TCP 连接,但随后又立即关闭它?

编辑

  • 主题已经存在,kafka-topics.sh --list 显示它。

  • 我用过的所有客户端都有同样的问题:Kafka 的 kafka-console-producer.shkafka-pythonconfluent-kafkakafkacat

  • Kafka 集群与我的所有其他计算机位于同一 VPC 中,其安全组允许该 VPC 内的任何传入和传出流量。

  • 但是,它由 Amazon 的 Kafka 托管流 (MSK) 服务管理,这意味着我无法精细控制服务器安装设置(甚至不知道它们是什么)。 MSK 只是发布 zookeeper 和消息代理 URL 供客户端使用。

  • 生产者作为 AWS Lambda 函数运行,但当我在普通 EC2 实例上运行时问题仍然存在。

  • 权限不是问题。我已经为 lambda 角色分配了它需要的所有 AWS 权限(AWS 总是非常明确地说明哪个操作需要哪个缺少的权限)。

  • 连接不是问题。我可以使用标准 telnet 访问动物园管理员和消息代理的 URL。但是,向 zookeepers 发出命令是有效的,而向消息代理发出命令总是最终失败。由于Kafka uses a binary protocol over TCP,我不知道如何进一步调试问题。

编辑

按照建议,我用

进行了调试

./kafkacat -b $BROKERS -L -d 经纪人

得到:

7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1574772202.379|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1574772202.379|BROKERFAIL|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: failed: err: Local: Broker transport failure: (errno: Operation now in progress)
%7|1574772202.379|FEATURE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Updated enabled protocol features -ApiVersion to
%7|1574772202.380|STATE|rdkafka#producer-1| [thrd:HOSTNAME]: HOSTNAME:9094/bootstrap: Broker changed state APIVERSION_QUERY -> DOWN

那么,这是客户端和代理 API 版本之间的一种不匹配吗?记住我没有控制 AWS 提供的 Kafka 集群的版本或配置,我该如何从中恢复?

【问题讨论】:

  • 您的目标主题是否已经创建?如果您也可以分享您的代码和配置,那将会很有帮助。
  • @GiorgosMyrianthous 不,不是,但有和没有auto.create.topics.enable 的问题是一样的。
  • 尝试在运行您的生产者之前创建主题。它应该可以工作。
  • @GiorgosMyrianthous 我现在尝试独立地预先填充主题。问题是一样的。
  • 只检查“ufw status”,如果您在 status 中获得任何内容,只需禁用 ufw 然后尝试相同。

标签: python amazon-web-services apache-kafka kafka-producer-api amazon-msk


【解决方案1】:

我认为这与 TLS 加密有关。默认情况下,MSK 启动一个同时接受 PLAINTEXT 和 TLS 的集群,但如果您以编程方式从集群中获取引导服务器,它只会为您提供 TLS 端口。如果您遇到这种情况,请尝试使用 PLAINTEXT 端口 9092。

要对 TLS 客户端进行身份验证,您需要生成一个证书:https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html,然后需要将此证书放入您的 lambda 并在您的 Producer 配置中引用该证书。

如果您只能将 MSK 集群配置为 PLAINTEXT,那么当您从 AWS SDK 获取引导服务器时,它将为您提供 PLAINTEXT 端口,您应该会很好。

【讨论】:

  • 嗯。我可以连接到 9094 端口,但是 9092 端口超时。此外,get_bootstrap_brokers() 调用返回 BootstrapBrokersTls,但没有普通的 BootstrapBrokers。我想这意味着集群实际上不允许纯文本连接。所以我必须要么配置一个证书,要么让集群在没有证书的情况下进行通信,对吧?
  • 基于此,您可能需要执行$ export KAFKA_OPTS="-Djavax.net.debug=all",运行kafka-console-producer ...,并检查任何 TLS 问题的跟踪。
  • @KilianFoth,是的,没错。仅当您不需要加密或生成证书并将其添加到您的 lambda 部署包时,才将集群重新部署为 PLAINTEXT。当我们进行此操作时,我们使用了 PLAINTEXT 选项,因为集群仅在内部并且位于 VPC 内部。
【解决方案2】:

由于它也不适用于非 python 客户端,因此它不太可能是库中的错误。

这似乎是一个网络问题

有一个名为 advertised.listeners 的 kafka 代理设置,它指定客户端在第一次连接后将使用的地址。换句话说,这就是客户端消费或生产时发生的情况:

  1. 使用bootstrap.servers,建立第一个连接并要求使用真实地址。

  2. 代理返回代理配置中advertised.listeners 指定的地址。

  3. 客户端尝试使用该新地址进行消费或生产。

这是一项安全功能,可防止可公开访问的代理被不应访问的客户端消费/生产。

如何诊断

运行以下命令:

$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L

返回

Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap):
1 brokers:
  broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092

在这种情况下,ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 是客户端指定的地址,即使客户端可以访问该地址/端口,ip-172-31-18-160.us-west-2.compute.internal:9092 也将是用于消费/生产的地址。

现在,如果您在 AWS MSK 中运行 kafka,它可能会为您管理它。您必须确保您可以访问该命令返回的地址。如果不这样做,您可能需要更改它或从有权访问它的主机运行您的命令。

另一种选择可能是使用可以在内部访问该地址的堡垒主机打开 ssh 隧道。

您可以在以下位置找到更多详细信息:https://rmoff.net/2018/08/02/kafka-listeners-explained

【讨论】:

  • 好吧,./kafkacat -b $BROKERS -L 给了我:“% 错误:无法获取元数据:本地:超时”。问题是,虽然我已经允许进出客户端主机的所有网络流量,但我看不到如何允许进出 Kafka 消息代理主机的更多连接,因为 AWS 为我管理这些。
猜你喜欢
  • 2020-04-25
  • 2013-08-13
  • 1970-01-01
  • 2019-02-21
  • 1970-01-01
  • 2011-08-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多