【问题标题】:Producing and Consuming Avro messages from Kafka without Confluent components在没有 Confluent 组件的情况下从 Kafka 生产和使用 Avro 消息
【发布时间】:2016-09-14 09:22:33
【问题描述】:

我正在尝试找到一个可以从 kafka 生成和订阅 avro 消息的示例。

此时,我想使用没有任何融合插件的“香草”kafka 部署。

这可能吗?到目前为止,我发现的所有示例都很快开始使用融合特定工具来处理 avro 消息。

我确信应该有一种方法可以让我在 kafka 平台上发布和使用 avro 消息,而无需任何“特定于分发”的插件。

【问题讨论】:

  • 如果我理解您的问题,那么在 Kafka 中没有任何内置方法可以生成和加载 avro 消息。基本上,您将使用像 fastavro 这样的 avro 客户端在生成到 Kafka 之前立即序列化为 avro 格式,并在从主题消费后立即加载它。
  • 那么像在自定义序列化程序中包装 avro 序列化/反序列化逻辑吗?
  • 嗯...或多或少,在生成主题之前将其序列化为avro格式。根据您使用的客户端,可能会有执行此操作的快捷方式。

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

当然,您可以在没有任何 Confluent 工具的情况下做到这一点。但是您必须自己做一些额外的工作(例如在您的应用程序代码中)——这是提供与 Avro 相关的工具(例如您提到的来自 Confluent 的工具)的最初动机。

一种选择是直接使用 Apache Avro Java API 手动序列化/反序列化 Kafka 消息的负载(例如,从 YourJavaPojobyte[])。 (我想您暗示 Java 是首选的编程语言。)这会是什么样子?这是一个例子。

  • 首先,您将在将数据写入 Kafka 的应用程序中手动序列化数据负载。在这里,您可以使用 Avro 序列化 API 对有效负载进行编码(从 Java pojo 到 byte[]),然后使用 Kafka 的 Java 生产者客户端将编码后的有效负载写入 Kafka 主题。
  • 然后,在数据管道的下游,您将在另一个从 Kafka 读取数据的应用程序中反序列化。在这里,您可以使用 Kafka 的 Java 消费者客户端从同一 Kafka 主题读取(编码)数据,并使用 Avro 反序列化 API 再次将有效负载解码回来(从 byte[] 到 Java pojo)。

当然,在使用 Kafka Streams(即将推出的 Apache Kafka 0.10 中包含)或 Apache Storm 等流处理工具时,您也可以直接使用 Avro API。

最后,您还可以选择使用一些实用程序库(来自 Confluent 或其他地方),这样您就不必直接使用 Apache Avro API。对于它的价值,我在kafka-storm-starter 上发布了一些稍微复杂的示例,例如如AvroDecoderBolt.scala 所示。在这里,Avro 序列化/反序列化是通过使用 Scala 库Twitter Bijection 完成的。下面是一个AvroDecoderBolt.scala 的示例 sn-p,让您大致了解一下:

  // This tells Bijection how to automagically deserialize a Java type `T`,
  // given a byte array `byte[]`.
  implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]

  // Let's put Bijection to use.
  private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
    require(bytes != null, "bytes must not be null")
    val decodeTry = Injection.invert(bytes)  // <-- deserialization, using Twitter Bijection, happens here
    decodeTry match {
      case Success(pojo) =>
        log.debug("Binary data decoded into pojo: " + pojo)
        collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
        ()
      case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
    }
  }

所以是的,您当然可以选择不使用任何其他库,例如 Confluent 的 Avro 序列化器/反序列化器(目前作为confluentinc/schema-registry 的一部分提供)或Twitter's Bijection。是否值得付出额外的努力由您决定。

【讨论】:

    猜你喜欢
    • 2016-11-10
    • 2019-02-19
    • 2019-01-15
    • 2019-07-12
    • 2017-07-23
    • 2019-05-14
    • 2020-11-23
    • 1970-01-01
    • 2019-11-19
    相关资源
    最近更新 更多