【问题标题】:Is it possible sending websocket messages to a kafka topic?是否可以将 websocket 消息发送到 kafka 主题?
【发布时间】:2023-03-16 05:22:02
【问题描述】:

我正在尝试找到一种方法来使用由 websocket 发送到 kafka 主题的消息(消息由 websocket 发送到地址 'ws://address:port/topic_name' 我想添加所有这些消息都发送到 kafka 主题)。 我阅读了有关 kafka connect 并试图找到一种方法来使用它,但它似乎不起作用...... 在此先感谢:)

【问题讨论】:

    标签: websocket apache-kafka


    【解决方案1】:

    Confluent 平台中没有连接到套接字的 Kafka 连接器。

    我在一个在生产中使用 Kafka 的团队工作,我们的源是一个套接字,所以您的选择是使用支持此套接字的平台->Kafka 生产,或者自己编写一个。

    关于可能的平台,我认为它们中的大多数都是矫枉过正的,尽管你可以利用它们来解决这个问题,一些选项是:
    1. NiFiMiniFi 对于较小的负载,请使用 PublishKafka Processor
    2.StreamSetsKafka Producer Destination
    3. Apache Flume- 不太推荐,这个项目是停止进化的。

    如果您想编写自己的生产者,您基本上必须在此端口上创建一个侦听器,并将传入的消息生成到 Kafka;如果这是一个 Web 套接字,只需获取请求的有效负载并将它们生成到 Kafka。
    可以从 tutorialspoint simple producer example* 复制示例 Kafka 生产者代码

    以下是一些开源项目示例:
    1.https://github.com/DataReply/kafka-connect-socket-source
    2.https://github.com/kafka-socket/miniature_engine
    3.https://github.com/dhanuka84/kafka-connect-tcp
    4.https://github.com/krux/tcp-stream-kafka-producer

    【讨论】:

    • “没有用于套接字的 Kafka 连接器” ...“这里有一些使用套接字源的项目的链接”... 它是什么?
    • 说“Kafka 连接器”是指 Confluent 的 Kafka Connect 框架。编辑了 Kafka Connect 而不是连接器的答案
    • Kafka Connect 是 Apache 许可的,不仅在 Confluent 平台中。他们编写连接器(插件)。
    【解决方案2】:

    Kafka 连接的想法是您有某种外部集成作为存储。这可以是 SAP、Salesforce、RDBMS、MQ 或任何其他有状态的东西。您的 websocket 端点没有数据,您不能轮询它是其他人在调用它,因此数据被传输。现在,如果您知道谁实际持有数据,那么您可以使用本指南构建连接器。 https://docs.confluent.io/current/connect/devguide.html

    对于您的特定情况,您可以做的最好的事情是使用 Kafka Producer API https://docs.confluent.io/current/clients/producer.html
    并且从您的 websocket enpoint 使用此生产者向主题发布消息,或者如果您使用 spring,您可以使用更高级别的抽象,这将是 KafkaTemplate https://docs.spring.io/spring-kafka/reference/html/#sending-messages

    【讨论】:

    • @cricket_007 by state 我的意思是数据。
    • 这也不对。有源连接器生成随机数据并且没有状态/数据
    • @cricket_007 你是说你的内存不是数据存储吗? :) 它被您的操作系统和语言隐藏的事实并没有减少它的存储空间:) 不要把它玩得太聪明,连接器 - 这个名字很好地说明了它的用途。
    • @cricket_007 另外,如果您决定在连接器中生成随机数据,您将搞砸 IMO 的单一责任原则。为了不破坏它,您将拥有某种外部机制来创建测试数据,然后您将通过连接器连接到它。但同样,存储和您要连接的是您的生成器,它可以在内存中,在这种情况下,RAM 就是您的存储。
    【解决方案3】:

    完全披露:我为 MigratorData 工作。

    您可以查看MigratoryData's solution for Kafka。 MigratorData 是一个可扩展的 WebSocket 服务器。 Kafka 的 MigratoryData Source/Sink 连接器利用 Kafka Connect API,可用于将数据从 Kafka 实时流式传输到 WebSocket 客户端,反之亦然。该解决方案的主要优势在于它将 Kafka 消息传递扩展到 WebSocket 客户端,同时保留了 Kafka 的关键特性,如保证交付、消息排序等。

    【讨论】:

    • @Mihai 链接到您自己的网站或内容(或您附属的内容)时,您must disclose your affiliation in the answer 以免被视为垃圾邮件。根据 Stack Exchange 政策,在您的用户名中包含与 URL 相同的文本或在您的个人资料中提及它不被视为充分披露。