【问题标题】:Producing avro message using php-enqueue使用 php-enqueue 生成 avro 消息
【发布时间】:2020-10-28 19:00:40
【问题描述】:

我正在研究一种使用 php-enqueue 将 avro 消息从 php 生成到 kafka 的方法。

他们的documentation 声明您可以使用其他格式,包括 Apache Avro。

默认情况下,传输将消息序列化为 json 格式,但您 可能想要使用另一种格式,例如 Apache Avro。为此你 必须实现 Serializer 接口并将其设置为上下文, 生产者或消费者。如果将序列化程序设置为上下文,它将是 注入到上下文创建的所有消费者和生产者。

<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
    public function toMessage($string) {}

    public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $context */

$context->setSerializer(new FooSerializer());

示例中的序列化程序正在与字符串相互转换。据我所知,Avro 格式是二进制的,那么在这种情况下自定义序列化程序应该如何工作?

【问题讨论】:

    标签: php apache-kafka avro kafka-producer-api php-enqueue


    【解决方案1】:

    PHP 字符串可以包含二进制数据。这是使用已在模式注册表中注册的模式 id 生成 avro 消息的部分实现。 使用 jaumo/avro 实现对 avro 的序列化。

    public function toString(RdKafkaMessage $message): string
    {
        ...
    
        $message = json_decode($message->getBody(), true);
    
        $encodedHeader = $this->createAvroHeader($schemaId);
        $encodedMessage = Serde::encodeMessage($parsedSchema, $message);
    
        return $encodedHeader . $encodedMessage;
    }
    
    private function createAvroHeader(int $schemaId): string
    {
        $binarySchemaId = hex2bin(sprintf("%08s", dechex($schemaId)));
        return pack("C", 0) . $binarySchemaId;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-03-09
      • 2021-10-04
      • 2018-05-17
      • 1970-01-01
      • 2019-09-11
      • 1970-01-01
      • 2016-08-28
      • 2016-11-10
      相关资源
      最近更新 更多