【问题标题】:Kafka - scala - processing multiple messagesKafka - scala - 处理多条消息
【发布时间】:2018-01-04 07:01:26
【问题描述】:

是否可以通过 Kafka Producer 对象发送一个字符串数组。我想从'topic1'中获取一些消息 - 文本行然后将其拆分为单个单词并将其发送到另一个主题。

    object KafkaConsumer extends App {

      implicit val actorSystem = ActorSystem("test-actor-system")
      implicit val streamMaterializer = ActorMaterializer()
      implicit val executionContext = actorSystem.dispatcher
      val log = actorSystem.log


      // PRODUCER config
      val producerSettings = ProducerSettings(
        actorSystem,
        new ByteArraySerializer,
        new StringSerializer)
        .withBootstrapServers("localhost:9092")
        .withProperty("auto.create.topics.enable", "true")

      // CONSUMER config
      val consumerSettings = ConsumerSettings(
        system = actorSystem,
        keyDeserializer = new ByteArrayDeserializer,
        valueDeserializer = new StringDeserializer)
        .withBootstrapServers("localhost:9092")
        .withGroupId("kafka-sample")
        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      // -----------------------------------------------------------------------//

      // ROUTE OF THE APP
      Consumer.committableSource(consumerSettings, 
      Subscriptions.topics("topic1"))
     .map { 
           msg => println(s"topic1 -> topic2: $msg") 
           ProducerMessage.Message(new ProducerRecord[Array[Byte], String]( "topic2", msg.record.value), msg.committableOffset)
          }
     .runWith(Producer.commitableSink(producerSettings))
     }  

【问题讨论】:

  • 快速回复是肯定的,您也可以发送您的对象的 JSON
  • 我的主要目标是发送该数组的每一个元素。我的意思是 => 消费者取自 topic1:“xxxx xxx xx x”,我想使用 Producer 4 消息发送:“xxxx”、“xxx”、“xx”、“x”。你能帮我解决这个问题吗?
  • 也许我不明白,但您可以拆分您的消息,然后调用 4 次发送(生成)消息。
  • 这就是问题所在,因为我做不到。如果我想调用 for 循环 4 次来发送消息,则会出现错误,因为 .runWith 无法解析具有此类签名的引用。

标签: scala apache-kafka akka akka-stream


【解决方案1】:

Akka Streams 示例创建了一个简单的流,它读取一条消息,使用一个 Sink 生成到 Kafka 并提交消费消息的偏移量。如果您需要阅读一条或多条消息并产生许多消息,因为消费集中存在单词,您应该更多地使用 Akka Stream Graph api。

此示例使用 Graphs 并从 Kafka 构建一个 Source,并使用 groupedWithin 读取一堆消息并获取现有单词。

创建了两个简单的流程,一个用于获取最后一个偏移量,另一个用于获取单词。然后创建一个 Source 阶段,将消费的消息从 Kafka 广播到两个流,并将结果压缩到一个元组 (Seq[String],Long) 中。 runForeach 函数会生成消息。请注意,消息不是按 Future.sequence 的顺序生成的。

虽然示例看起来很长,但它可以使用 "com.typesafe.akka" %% "akka-stream-kafka" % "0.14"

正常编译和工作
import java.util.Properties

import akka.actor.ActorSystem
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffset}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Source, Zip}

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.{
  ByteArrayDeserializer,
  ByteArraySerializer,
  StringDeserializer,
  StringSerializer
}

import scala.concurrent.Future
import scala.util.Success
import scala.concurrent.duration._

object SplitSource extends App {

  implicit val actorSystem = ActorSystem("test-actor-system")
  implicit val streamMaterializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher
  val log = actorSystem.log

  // PRODUCER config
  val producerSettings = ProducerSettings(actorSystem,
                                          new ByteArraySerializer,
                                          new StringSerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("auto.create.topics.enable", "true")

  // CONSUMER config
  val consumerSettings =
    ConsumerSettings(system = actorSystem,
                     keyDeserializer = new ByteArrayDeserializer,
                     valueDeserializer = new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("kafka-sample4")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  implicit val producerConfig = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "localhost:9092")
    props.setProperty("key.serializer", classOf[StringSerializer].getName)
    props.setProperty("value.serializer", classOf[StringSerializer].getName)
    props
  }

  lazy val kafkaProducer = new KafkaProducer[String, String](producerConfig)

  // Create Scala future from Java
  private def publishToKafka(id: String, data: String) = {
    Future {
      kafkaProducer
        .send(new ProducerRecord("outTopic", id, data))
        .get()
    }
  }

  def getKafkaSource =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("inTopic"))
      // It consumes 10 messages or waits 30 seconds to push downstream
      .groupedWithin(10, 30 seconds)

  val getStreamSource = GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val in = getKafkaSource

    // BroadCast to two flows. One for obtain the last offset to commit
    // and other to return the Seq with the words to publish
    val br = b.add(Broadcast[Seq[CommittableMessage[Array[Byte], String]]](2))
    val zipResult = b.add(Zip[CommittableOffset, Array[String]]())
    val flowCommit = Flow[Seq[CommittableMessage[Array[Byte], String]]].map(_.last.committableOffset)

    // Flow that creates the list of all words in all consumed messages
    val _flowWords =
      Flow[Seq[CommittableMessage[Array[Byte], String]]].map(input => {
        input.map(_.record.value()).mkString(" ").split(" ")
      })

    val zip = Zip[CommittableOffset, Array[String]]

    // build the Stage
    in ~> br ~> flowCommit ~> zipResult.in0
          br ~> _flowWords ~> zipResult.in1

    SourceShape(zipResult.out)
  }

  Source.fromGraph(getStreamSource).runForeach { msgs =>
    {
      // Publish all words and when all futures complete the commit the last Kafka offset
      val futures = msgs._2.map(publishToKafka("outTopic", _)).toList

      // Produces in parallel!!. Use flatMap to make it in order
      Future.sequence(futures).onComplete {
        case Success(e) => {
          // Once all futures are done, it makes commit to the last consumed message
          msgs._1.commitScaladsl()
        }
      }
    }
  }

}

Akka Stream api 允许创建很棒的处理管道。

【讨论】:

    【解决方案2】:

    你应该在map之前使用mapConcat,因为它

    将每个输入元素转换为Iterable 的输出元素,然后将其展平为输出流。

    完整的附加行如下:

    Subscriptions.topics("topic1"))
      .mapConcat { msg => msg.record.value().split(" ").toList }
      .map { ...
    

    【讨论】:

    • 这是一个简单而伟大的解决方案,但请注意,如果您将所有消息映射到字符串列表,那么您将无法从中获得可提交的偏移量。
    猜你喜欢
    • 2019-06-21
    • 2016-06-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-18
    • 2018-07-26
    • 2019-12-15
    相关资源
    最近更新 更多