【发布时间】:2023-03-16 05:22:02
【问题描述】:
我正在尝试找到一种方法来使用由 websocket 发送到 kafka 主题的消息(消息由 websocket 发送到地址 'ws://address:port/topic_name' 我想添加所有这些消息都发送到 kafka 主题)。 我阅读了有关 kafka connect 并试图找到一种方法来使用它,但它似乎不起作用...... 在此先感谢:)
【问题讨论】:
我正在尝试找到一种方法来使用由 websocket 发送到 kafka 主题的消息(消息由 websocket 发送到地址 'ws://address:port/topic_name' 我想添加所有这些消息都发送到 kafka 主题)。 我阅读了有关 kafka connect 并试图找到一种方法来使用它,但它似乎不起作用...... 在此先感谢:)
【问题讨论】:
Confluent 平台中没有连接到套接字的 Kafka 连接器。
我在一个在生产中使用 Kafka 的团队工作,我们的源是一个套接字,所以您的选择是使用支持此套接字的平台->Kafka 生产,或者自己编写一个。
关于可能的平台,我认为它们中的大多数都是矫枉过正的,尽管你可以利用它们来解决这个问题,一些选项是:
1. NiFi 或 MiniFi 对于较小的负载,请使用 PublishKafka Processor
2.StreamSets 与Kafka Producer Destination
3. Apache Flume- 不太推荐,这个项目是停止进化的。
如果您想编写自己的生产者,您基本上必须在此端口上创建一个侦听器,并将传入的消息生成到 Kafka;如果这是一个 Web 套接字,只需获取请求的有效负载并将它们生成到 Kafka。
可以从 tutorialspoint simple producer example* 复制示例 Kafka 生产者代码
以下是一些开源项目示例:
1.https://github.com/DataReply/kafka-connect-socket-source
2.https://github.com/kafka-socket/miniature_engine
3.https://github.com/dhanuka84/kafka-connect-tcp
4.https://github.com/krux/tcp-stream-kafka-producer
【讨论】:
Kafka 连接的想法是您有某种外部集成作为存储。这可以是 SAP、Salesforce、RDBMS、MQ 或任何其他有状态的东西。您的 websocket 端点没有数据,您不能轮询它是其他人在调用它,因此数据被传输。现在,如果您知道谁实际持有数据,那么您可以使用本指南构建连接器。 https://docs.confluent.io/current/connect/devguide.html
对于您的特定情况,您可以做的最好的事情是使用 Kafka Producer API https://docs.confluent.io/current/clients/producer.html
并且从您的 websocket enpoint 使用此生产者向主题发布消息,或者如果您使用 spring,您可以使用更高级别的抽象,这将是 KafkaTemplate https://docs.spring.io/spring-kafka/reference/html/#sending-messages。
【讨论】:
完全披露:我为 MigratorData 工作。
您可以查看MigratoryData's solution for Kafka。 MigratorData 是一个可扩展的 WebSocket 服务器。 Kafka 的 MigratoryData Source/Sink 连接器利用 Kafka Connect API,可用于将数据从 Kafka 实时流式传输到 WebSocket 客户端,反之亦然。该解决方案的主要优势在于它将 Kafka 消息传递扩展到 WebSocket 客户端,同时保留了 Kafka 的关键特性,如保证交付、消息排序等。
【讨论】: