【问题标题】:What is a good pattern for committing Kafka consumer offset after processing message?处理消息后提交 Kafka 消费者偏移量的好模式是什么?
【发布时间】:2017-03-06 20:14:30
【问题描述】:

我正在使用 Akka Streams Kafka 将 Kafka 消息通过管道传输到远程服务。我想保证服务只接收每条消息一次(至少一次和最多一次传递)。

这是我想出的代码:

  private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                           topicPattern: String,
                           mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))
      .map(message => (message, mapCommittableMessageToSinkMessage(message)))
      .mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)
  }

如代码所示,它映射原始消息的元组,以及传递给订阅者(发送到远程服务的参与者)的转换消息。元组的目的是在订阅者完成处理后提交偏移量。

关于它的某些东西似乎是一种反模式,但我不确定是否有更好的方法来做到这一点。有什么更好的建议吗?

谢谢!

【问题讨论】:

    标签: akka-stream akka-kafka


    【解决方案1】:

    使用GraphDSL 可以保持这种更清洁和更容易更改的一种方法。它可以让你生成一个分支来承载消息的Committable 部分,而另一个分支可以执行所有需要的业务逻辑。

    图表的一个例子可能是(为了更清楚,省略所有样板):

    val src = Consumer.committableSource(consumerSettings, Subscriptions
          .topicPattern(topicPattern))
    
    val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))
    
    val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
          .runWith(Sink.ignore)  // look into Sink.foldAsync for a more compact re-write of this part
    
    src ~> broadcast
           broadcast ~> businessLogic ~> zip.in0
           broadcast         ~>          zip.in1
                                         zip.out.map(_._2) ~> snk
    

    【讨论】:

    • 谢谢斯特凡诺!你的方法奏效了。它比我的方法更多的代码。我是 Akka 的新手,所以我可以看到 GraphDSL 可能是一种更可扩展的复杂流方法。我将在单独的答案中发布带有样板的代码。
    【解决方案2】:

    这是在上述答案中使用@stefano-bonetti 方法工作的完整代码:

      private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                                 topicSuffix: String,
                                 convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) {
    
        val groupId = config.getString("group-id")
        val subscriberName = subscriber.path.name
        val customerId = config.getString("customer-id")
        val topicPattern = s"^$customerId\\.$topicSuffix$$"
    
        implicit val materializer = ActorMaterializer()
    
        val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
          .withGroupId(s"$groupId.$subscriberName")
          .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    
        implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
        import system.dispatcher // the ExecutionContext that will be used in ask call below
    
        val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern))
    
        val businessLogic = Flow[CommittableMessage[String, String]]
          .mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message)))
    
        val snk = Flow[CommittableMessage[String, String]]
          .mapAsync(1)(message => message.committableOffset.commitScaladsl())
          .to(Sink.ignore)
    
        val decider: Supervision.Decider = {
          case e => {
            system.log.error("error in stream", e)
            Supervision.Stop
          }
        }
    
        val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
          import GraphDSL.Implicits._
    
          val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2))
          val zip = builder.add(Zip[Any, CommittableMessage[String, String]])
    
          src ~> broadcast
          broadcast ~> businessLogic ~> zip.in0
          broadcast ~> zip.in1
          zip.out.map(_._2) ~> snk
    
          ClosedShape
        })
          .withAttributes(ActorAttributes.supervisionStrategy(decider))
          .run(materializer)
      }
    

    【讨论】:

      猜你喜欢
      • 2018-12-17
      • 1970-01-01
      • 2018-06-14
      • 1970-01-01
      • 2015-11-30
      • 2022-11-13
      • 1970-01-01
      • 2017-08-22
      • 2014-10-07
      相关资源
      最近更新 更多