【问题标题】:How to integrate akka streams kafka (reactive-kafka) into akka http application?如何将akka流kafka(reactive-kafka)集成到akka http应用程序中?
【发布时间】:2018-02-16 08:47:33
【问题描述】:

我有一个基本的 scala akka http CRUD 应用程序。相关类见下文。

我只想在创建/更新实体时将实体 ID 和一些数据(作为 json)写入 Kafka 主题。

我正在查看http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是 scala 和 akka 的新手,不确定如何将它集成到我的应用程序中?

例如,从上面的文档中,这是一个生产者写给 kafka 的例子,所以我想我需要类似的东西,但是我的应用程序应该去哪里?创建用户后,我可以在我的服务的 create 方法中添加另一个地图调用吗?

非常感谢!

val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))

或者我是否需要在我的 Server.scala 的 bindAndHandle() 方法中执行类似 https://github.com/hseeberger/accessus 的示例?

WebServer.scala

object System {

  implicit val system = ActorSystem()
  implicit val dispatcher = system.dispatcher
  implicit val actorMaterializer = ActorMaterializer()

}

object WebServer extends App {

  import System._

  val config = new ApplicationConfig() with ConfigLoader
  ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
  val injector = Guice.createInjector(new MyAppModule(config))
  val routes = injector.getInstance(classOf[Router]).routes
  Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)

}

Router.scala

def routes(): Route = {
    post {
      entity(as[User]) { user =>
        val createUser = userService.create(user)
        onSuccess(createUser) {
          case Invalid(y: NonEmptyList[Err]) =>  {
            throw new ValidationException(y)
          }
          case Valid(u: User) => {
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    } ~
    // More routes here, left out for example  
}

Service.scala

def create(user: User): Future[MaybeValid[User]] = {
    for {
      validating <- userValidation.validateCreate(user)
      result <- validating match {
        case Valid(x: User) =>
          userRepo.create(x)
            .map(dbUser => Valid(UserConverters.fromUserRow(x)))
        case y: DefInvalid =>
          Future{y}
      }
    } yield result
  }

回购.scala

def create(user: User): Future[User] = {
    mutateDbProvider.db.run(
      userTable returning userTable.map(_.userId)
        into ((user, id) => user.copy(userId = id)) +=
        user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
    )
  }

【问题讨论】:

  • 只是为了明确您想要什么:您希望每次create 返回有效用户时都将user.userId 发送到kafka?
  • 是的,也许还有一些其他信息,例如“实体”:“用户”

标签: scala apache-kafka akka akka-stream reactive-kafka


【解决方案1】:

既然你已经写了你的Route 到 unmarshall 1 UserEntity 我不认为你需要Producer.plainSink。相反,我认为Producer.send 也可以。另外,作为旁注,抛出异常不是“惯用的”scala。所以我改变了无效用户的逻辑:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")

          onComplete(producer send producerRecord) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

【讨论】:

  • @Rory 欢迎您。我不熟悉与您提供的示例类似的任何测试套件。
  • 您是否看到 not 使用 onComplete 调用 'producer send producerRecord' 有任何问题(因此我们不会在发送 HTTP API 之前等待发送到 kafka 完成回应用户)?基本上只是这样做:生产者发送生产者记录;完成(ToResponseMarshallable((StatusCodes.Created,u)))
  • @Rory 通过不使用onComplete,客户端可能会收到Created 响应代码,但不会发送消息。如果这种分叉状态正常,则没有问题,但通常的模式是仅在提交记录后才返回有效响应。这是一个设计选择:您想要在给定代码可能不正确的情况下快速响应,还是您想要等待验证代码?
  • 感谢 Ramon,对于我的用例,即使发送到 kafka 失败,我总是向用户返回 Created 响应(我正在以不同的方式处理这些错误),所以我想我是确定删除 onComplete。再次感谢。
猜你喜欢
  • 1970-01-01
  • 2018-03-08
  • 2017-07-17
  • 1970-01-01
  • 2018-01-19
  • 2019-09-02
  • 2019-04-16
  • 2017-01-30
  • 2018-09-12
相关资源
最近更新 更多