【问题标题】:How do I send Protobuf Messages via a Kafka Producer如何通过 Kafka Producer 发送 Protobuf 消息
【发布时间】:2019-11-15 19:28:17
【问题描述】:

我正在使用 Sarama 库通过 Producer 发送消息。 这允许我发送字符串。我的目标是发送 Protobuf 消息

msg := &sarama.ProducerMessage{
            Topic: *topic,
            Value: sarama.StringEncoder(content),
        }

这是我拥有的示例原型类

message Pixel {

    // Session identifier stuff
    int64 timestamp    = 1; // Milliseconds from the epoch
    string session_id  = 2; // Unique Identifier... for parent level0top
    string client_name = 3; // Client-name/I-key

    string ip = 10;
    repeated string ip_list = 11;
    string datacenter = 12;
    string proxy_type = 13;

请您提供一个示例,说明如何发送 protobuf 消息。

【问题讨论】:

    标签: go protocol-buffers kafka-producer-api


    【解决方案1】:

    您需要在生产者端使用proto#Marshalsarama#ByteEncoder,在消费者端使用proto#Unmarshal


    制作人:

                pixelToSend := &pixel.Pixel{SessionId: t.String()}
                pixelToSendBytes, err := proto.Marshal(pixelToSend)
                if err != nil {
                    log.Fatalln("Failed to marshal pixel:", err)
                }
    
                msg := &sarama.ProducerMessage{
                    Topic: topic,
                    Value: sarama.ByteEncoder(pixelToSendBytes),
                }
    

    消费者:

            receivedPixel := &pixel.Pixel{}
            err := proto.Unmarshal(msg.Value, receivedPixel)
            if err != nil {
                log.Fatalln("Failed to unmarshal pixel:", err)
            }
    
            log.Printf("Pixel received: %s", receivedPixel)
    

    完整示例:

    package main
    
    import (
        pixel "example/pixel"
        "log"
        "os"
        "os/signal"
        "syscall"
        "time"
    
        "github.com/Shopify/sarama"
        "github.com/golang/protobuf/proto"
    )
    
    func main() {
        topic := "your-topic-name"
        brokerList := []string{"localhost:29092"}
    
        producer, err := newSyncProducer(brokerList)
        if err != nil {
            log.Fatalln("Failed to start Sarama producer:", err)
        }
    
        go func() {
            ticker := time.NewTicker(time.Second)
            for {
                select {
                case t := <-ticker.C:
                    pixelToSend := &pixel.Pixel{SessionId: t.String()}
                    pixelToSendBytes, err := proto.Marshal(pixelToSend)
                    if err != nil {
                        log.Fatalln("Failed to marshal pixel:", err)
                    }
    
                    msg := &sarama.ProducerMessage{
                        Topic: topic,
                        Value: sarama.ByteEncoder(pixelToSendBytes),
                    }
    
                    producer.SendMessage(msg)
                    log.Printf("Pixel sent: %s", pixelToSend)
                }
            }
    
        }()
    
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
    
        partitionConsumer, err := newPartitionConsumer(brokerList, topic)
        if err != nil {
            log.Fatalln("Failed to create Sarama partition consumer:", err)
        }
    
        log.Println("Waiting for messages...")
    
        for {
            select {
            case msg := <-partitionConsumer.Messages():
                receivedPixel := &pixel.Pixel{}
                err := proto.Unmarshal(msg.Value, receivedPixel)
                if err != nil {
                    log.Fatalln("Failed to unmarshal pixel:", err)
                }
    
                log.Printf("Pixel received: %s", receivedPixel)
            case <-signals:
                log.Print("Received termination signal. Exiting.")
                return
            }
        }
    }
    
    func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) {
        config := sarama.NewConfig()
        config.Producer.Return.Successes = true
        // TODO configure producer
    
        producer, err := sarama.NewSyncProducer(brokerList, config)
        if err != nil {
            return nil, err
        }
    
        return producer, nil
    }
    
    func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) {
        conf := sarama.NewConfig()
        // TODO configure consumer
        consumer, err := sarama.NewConsumer(brokerList, conf)
        if err != nil {
            return nil, err
        }
    
        partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
        if err != nil {
            return nil, err
        }
    
        return partitionConsumer, err
    }
    

    【讨论】:

    • 谢谢你。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-11-01
    • 2021-11-09
    • 1970-01-01
    • 2013-07-22
    • 1970-01-01
    • 2019-02-14
    • 2016-07-16
    相关资源
    最近更新 更多