【发布时间】:2020-05-01 00:47:48
【问题描述】:
我正在尝试使用以下命令从 Dataflow (Apache Beam) 写入 Confluent Cloud/Kafka:
kafkaKnowledgeGraphKVRecords.apply("Write to Kafka", KafkaIO.<String, String>write()
.withBootstrapServers("<mybootstrapserver>.confluent.cloud:9092")
.withTopic("testtopic").withKeySerializer(StringSerializer.class)
.withProducerConfigUpdates(props).withValueSerializer(StringSerializer.class));
Map<String, Object> props = new HashMap<>(); 的位置(即现在为空)
在日志中,我得到:send failed : 'Topic testtopic not present in metadata after 60000 ms.'
此集群上确实存在该主题 - 所以我的猜测是登录存在问题,这是有道理的,因为我找不到传递 APIKey 的方法。
我确实尝试了各种组合,通过上面的props 将我从 Confluent Cloud 获得的 APIKey/Secret 传递给身份验证,但我找不到有效的设置。
【问题讨论】:
-
“我确实尝试了各种组合来传递 APIKey/Secret” -> 你能更新你的问题以包括这些吗
-
stackoverflow.com/questions/53939658/… 显示了从 Beam 连接到 Confluent Cloud 的示例 - 它作为消费者,因此您需要针对适当的生产者配置进行更改,但属性应该相同
-
感谢@RobinMoffatt - 我尝试了与另一个答案中链接的参数类似的参数 - 也许我混淆了一些东西。我明天将尝试使用链接的答案,并在此处报告反馈。已经谢谢了!
-
@RobinMoffatt 再次感谢指点,我找到了解决方案,见下文
标签: java apache-kafka google-cloud-dataflow apache-beam confluent-cloud