【发布时间】:2020-08-07 09:16:51
【问题描述】:
我正在尝试部署一个 Google Cloud Dataflow 管道,该管道从 Kafka 集群中读取数据,处理其记录,然后将结果写入 BigQuery。但是,我在尝试部署时一直遇到以下异常:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster
Kafka 集群需要使用 JAAS 配置进行身份验证,我使用下面的代码来设置 KafkaIO.read Apache Beam 方法所需的属性:
// Kafka properties
Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
put("request.timeout.ms", 900000);
put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
}};
// Build & execute pipeline
pipeline
.apply(
"ReadFromKafka",
KafkaIO.<Long, String>read()
.withBootstrapServers(properties.getProperty("kafka.servers"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))
要在禁用公共 IP 的情况下部署 Dataflow 管道,但从我们的 Google Cloud VPC 网络到 Kafka 集群已建立 VPN 隧道,并且配置了双方私有 IP 所需的路由,并且它们的 IP 已列入白名单.我能够使用与要部署的 Dataflow 作业位于同一 VPN 子网中的 Compute Engine VM ping 并连接到 Kafka 服务器的套接字。
我认为配置存在问题,但我无法确定我是否缺少其他字段,或者现有字段之一是否配置错误。有谁知道我如何进一步诊断问题,因为抛出的异常并没有真正确定问题?任何帮助将不胜感激。
编辑: 我现在能够成功部署 Dataflow 作业,但是看起来好像读取功能不正常。在查看日志检查Dataflow作业中的错误后,我可以看到在发现kafka主题的组协调器后,在警告日志语句之前没有其他日志语句说关闭空闲读取器超时:
Close timed out with 1 pending requests to coordinator, terminating client connections
随后出现未捕获的异常,根本原因是:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined
然后出现错误说明:
Execution of work for P0 for key 0000000000000001 failed. Will retry locally.
这可能是键定义的问题,因为 Kafka 主题实际上在消息中没有键吗?当我在 Kafka Tool 中查看主题时,在数据中观察到的唯一列由 Offset、Message 和 Timestamp 组成。
【问题讨论】:
-
您能否确认在 Kafka 代理上禁用了 SSL 加密,因为您已经在消费者端传播了
SASL_PLAINTEXT参数? -
@mk_sta 是的,我可以确认 Kafka 代理已禁用 SSL 加密。
-
@mk_sta 我还更新了问题,因为我现在可以成功部署作业,但现在我在尝试从 Kafka 主题读取和尝试关闭空闲读取器时遇到问题。
-
从kafka读取你的部分代码不包含任何进一步的处理代码,能否扩展管道记录进一步分析问题?如果通过
DataflowRunner运行您的管道,我建议出于测试目的切换到DirectRunner以验证问题是否存在于您的本地环境中。 -
所以在进一步调试之后,问题似乎是由于 Dataflow 作业在使用直接登录到 Kafka 集群后仍会尝试连接到 Kafka 组协调器的公共 IP私有引导 IP。 Dataflow 作业目前配置为仅使用私有 ip,以节省成本,因此无法连接集群的公共 IP。我现在正在尝试寻找一种解决方案来绕过连接到组协调器的公共 IP。
标签: java google-cloud-platform apache-kafka google-cloud-dataflow apache-beam