【问题标题】:A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using http4s从 akka-stream 到 fs2 的旅程 - 如何使用 http4s 在 fs2 中定义类似阶段的 akka-stream http 流
【发布时间】:2021-11-28 20:59:15
【问题描述】:

我正在加深我在 fs2 方面的知识,并想尝试 fs2-kafka 来替换 akka 流的用例。这个想法很简单,从 kafka 读取数据并通过 http 请求将数据发布到接收器,然后在成功时提交回 kafka。到目前为止,我无法真正弄清楚 http 部分。在 akka 流/akka http 中,你有一个开箱即用的流 https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

与 akka 流完美集成。

我试图看看我是否可以用 http4s 和 fs2 做类似的事情。

有没有人有任何参考资料、代码示例、博客以及其他说明如何进行这种集成的东西。到目前为止,我唯一能想到的是将流包装到客户端资源的使用方法中,即

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

即使这样我也不确定整个事情

【问题讨论】:

  • someStrem.evalMap(data => sendHttpRequest(someClient, data)) - someClient 是在哪里创建的,例如 Stream.resource(SomeClientBuilder....build).flatMap { someClient => ??? }?或者问题是什么?如何创建客户端? http请求怎么做?
  • 我想你几乎回答了我的问题。我正在寻找确认我很清楚该怎么做。我使用 akka 流已经很长时间了,正如你所知,他们有大量的文档和 ecosys 以及很多示例。 fs2 或 fs2-kafka 不是这种情况(相比之下)。因此,根据您对生态系统本质的理解程度,您需要自己弄清楚这个特定的用例。
  • 虽然我在全猫效应生态系统方面变得越来越好,但我仍在学习,仍然没有 100% 的自信。因此,我正在寻找现成的示例,例如您在 akka 文档中的内容,以确认我走的是正确的道路
  • 所以问题是,您如何创建客户端,并在您的流中使用它来实际发送您从 Kafka 消费的数据?你能提供一个完整的基本想法,然后我会弄清楚其他的一切。谢谢
  • 但是我想我已经可以使用你提供的东西了:)

标签: scala http4s fs2


【解决方案1】:

typelevel 生态系统的特点是,一切都只是一个库,您不需要说明它们之间有多少交互的示例,您只需要了解每个库的工作原理和基本的组成规则。

def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}

请注意,我是在头脑中编写了所有这些内容,并且只是快速浏览了文档,您需要更改许多内容(尤其是类型,例如 DataResult .以及调整错误处理和何时提交回 Kafka 等内容。
不过,我希望这有助于您了解如何构建代码。

【讨论】:

  • 非常感谢您。它有帮助:)
猜你喜欢
  • 2020-08-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-03
  • 2021-10-19
  • 1970-01-01
  • 2018-03-08
  • 1970-01-01
相关资源
最近更新 更多