【问题标题】:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster using jaas SASL config authenticationorg.apache.kafka.common.errors.TimeoutException:使用 jaas SASL 配置身份验证获取 Kafka 集群的主题元数据时超时
【发布时间】:2020-08-07 09:16:51
【问题描述】:

我正在尝试部署一个 Google Cloud Dataflow 管道,该管道从 Kafka 集群中读取数据,处理其记录,然后将结果写入 BigQuery。但是,我在尝试部署时一直遇到以下异常:

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster

Kafka 集群需要使用 JAAS 配置进行身份验证,我使用下面的代码来设置 KafkaIO.read Apache Beam 方法所需的属性:

// Kafka properties
    Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
        put("request.timeout.ms", 900000);
        put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
        put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
    }};

    // Build & execute pipeline
    pipeline
            .apply(
                    "ReadFromKafka",
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers(properties.getProperty("kafka.servers"))
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            .withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))

要在禁用公共 IP 的情况下部署 Dataflow 管道,但从我们的 Google Cloud VPC 网络到 Kafka 集群已建立 VPN 隧道,并且配置了双方私有 IP 所需的路由,并且它们的 IP 已列入白名单.我能够使用与要部署的 Dataflow 作业位于同一 VPN 子网中的 Compute Engine VM ping 并连接到 Kafka 服务器的套接字。

我认为配置存在问题,但我无法确定我是否缺少其他字段,或者现有字段之一是否配置错误。有谁知道我如何进一步诊断问题,因为抛出的异常并没有真正确定问题?任何帮助将不胜感激。

编辑: 我现在能够成功部署 Dataflow 作业,但是看起来好像读取功能不正常。在查看日志检查Dataflow作业中的错误后,我可以看到在发现kafka主题的组协调器后,在警告日志语句之前没有其他日志语句说关闭空闲读取器超时:

Close timed out with 1 pending requests to coordinator, terminating client connections

随后出现未捕获的异常,根本原因是:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined

然后出现错误说明:

Execution of work for P0 for key 0000000000000001 failed. Will retry locally.

这可能是键定义的问题,因为 Kafka 主题实际上在消息中没有键吗?当我在 Kafka Tool 中查看主题时,在数据中观察到的唯一列由 Offset、Message 和 Timestamp 组成。

【问题讨论】:

  • 您能否确认在 Kafka 代理上禁用了 SSL 加密,因为您已经在消费者端传播了 SASL_PLAINTEXT 参数?
  • @mk_sta 是的,我可以确认 Kafka 代理已禁用 SSL 加密。
  • @mk_sta 我还更新了问题,因为我现在可以成功部署作业,但现在我在尝试从 Kafka 主题读取和尝试关闭空闲读取器时遇到问题。
  • 从kafka读取你的部分代码不包含任何进一步的处理代码,能否扩展管道记录进一步分析问题?如果通过DataflowRunner 运行您的管道,我建议出于测试目的切换到DirectRunner 以验证问题是否存在于您的本地环境中。
  • 所以在进一步调试之后,问题似乎是由于 Dataflow 作业在使用直接登录到 Kafka 集群后仍会尝试连接到 Kafka 组协调器的公共 IP私有引导 IP。 Dataflow 作业目前配置为仅使用私有 ip,以节省成本,因此无法连接集群的公共 IP。我现在正在尝试寻找一种解决方案来绕过连接到组协调器的公共 IP。

标签: java google-cloud-platform apache-kafka google-cloud-dataflow apache-beam


【解决方案1】:

根据最后一条评论,我假设您遇到的问题更多是网络堆栈,然后是最初寻找 Dataflow 管道中缺少的任何配置,就执行 Dataflow 作业运行器与 Kafka 代理的连接而言。

基本上,当您为 Dataflow 工作人员使用 Public IP 地址池时,您有一种最简单的方式来访问外部 Kafka 集群,而无需在双方都应用额外配置,因为您不需要在各方之间启动 VPC network 和执行例行网络作业以使所有路由正常工作。

但是,Cloud VPN 在双方实施 VPC 网络以及进一步调整此 VPC 的 VPN 网关、转发规则和地址池时带来了更多复杂性。相反,从 Dataflow 运行时的角度来看,您不需要在 Dataflow 运行器之间传播公共 IP 地址,这无疑会降低价格。

您提到的主要问题在于 Kafka 集群方面。由于Apache Kafka是一个分布式系统,它的核心原理是:当生产者/消费者执行时,它会请求关于哪个broker是分区的leader的元数据,接收具有该分区可用端点的元数据,因此客户端然后确认这些端点连接到特定的代理。据我了解,在您的情况下,与领导者的连接是通过绑定到外部网络接口的 listener 执行的,在server.properties broker setting 中配置。

因此,您可能会考虑在listeners 中创建一个单独的侦听器(如果它不存在),绑定到云 VPC 网络接口,并在必要时传播advertised.listeners 与返回客户端的元数据,其中包含用于连接到特定的经纪人。

【讨论】:

    猜你喜欢
    • 2021-07-21
    • 1970-01-01
    • 1970-01-01
    • 2017-09-14
    • 2021-10-07
    • 1970-01-01
    • 2019-06-12
    • 2016-08-24
    • 2022-11-16
    相关资源
    最近更新 更多