【问题标题】:HTTP Request Source to a Kafka Sink?到 Kafka 接收器的 HTTP 请求源?
【发布时间】:2020-06-16 20:44:01
【问题描述】:

如何将播放 POST 路由连接到 Kafka Sink?

我找到了https://github.com/jamesward/hello-play-kafka。 但它使用随机数的刻度源附加到 Kafka 接收器。1

如何使 POST 请求路由成为绑定到 Kafka 接收器的源?

编辑:POST 请求正文的格式为 json,Content-Type 为 application/json。发送到 kafka 的消息应该是完全相同的 json。该路由需要一条 json 消息。

【问题讨论】:

  • 请在问题中添加更多细节:POST请求数据的格式是什么?你能描述一下这应该如何转化为发布到 Kafka 的消息吗?每个请求中是否有消息流,或者您想在一条消息中将完整的 POST 正文发送到 Kafka?
  • @TimMoore 我添加了你提到的编辑。

标签: apache-kafka playframework akka-stream


【解决方案1】:

也许我误解了你的问题,但你可以使用 REST 代理向 Kafka 发送消息:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
      --data '{ "records": [ { "value": { "name": "testUser0" } }, { "value": { "name": "testUser1" } } ] }' \
      "http://localhost:8082/topics/jsontest"

参考:Confluent REST Proxy docs

【讨论】:

  • 我已经考虑过了。我没有这样做,因为我可能希望将来在此流程中添加诸如 authZ 和 authN 以及其他接收器之类的功能。
【解决方案2】:

由于您的 POST 请求是一对一地直接进入 Kafka,因此 Akka Streams 提供的流模型是否适合还不清楚。直接使用KafkaProducer API 可能更直接,一次将一条消息发送到 Kafka 主题。这将使代码,尤其是错误处理更加简单。

可能仍然有充分的理由更喜欢使用 Akka Streams,例如管理并发或排序。具体原因可能会影响使用它的最佳方法,因此如果您对此有更多详细信息,请添加到问题中,我会更新答案。

例如,如果您想确保没有并发写入,您可以在 Guice 中将一个类注册为单例(假设您使用 Guice 进行依赖注入)。创建后,它会使用 Source.queueSource.actorRefSource.actorRefWithBackpressure 以及 Kafka Sink 运行 Akka Stream。如果出现错误,它还需要使用RestartSource 来确保流重新启动。公开一个公共方法,允许调用者向物化源队列或参与者发送消息,然后将单例注入到您的控制器类中,以允许它使用该方法发布其 POST 数据。

这种方法的一个大缺点是控制器操作只能知道消息已成功发送到流,而不是一直成功写入 Kafka。如果 Kafka 代理关闭、应用程序崩溃或由于其他原因写入失败,则消息可能会丢失,即使在向发起客户端返回成功结果之后也是如此。这不是所有用例的问题,但需要考虑。

【讨论】:

    猜你喜欢
    • 2011-09-27
    • 1970-01-01
    • 2018-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-21
    相关资源
    最近更新 更多