【问题标题】:Golang segmentio/kafka-go consumer not workingGolang segmentio/kafka-go 消费者不工作
【发布时间】:2019-09-04 18:56:06
【问题描述】:

我正在使用segmentio/kafka-go 连接到 Kafka。

// to produce messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
    kafka.Message{Value: []byte("one!")},
    kafka.Message{Value: []byte("two!")},
    kafka.Message{Value: []byte("three!")},
)

conn.Close()

我可以使用此代码在我的 Kafka 服务器中生成内容。

// to consume messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    _, err := batch.Read(b)
    if err != nil {
        // err -> "invalid codec"
        break
    }
    fmt.Println(string(b))
}

batch.Close()
conn.Close()

但我无法使用上述代码进行消费。我收到错误invalid codec。可能是什么原因?

如果相关,我将最小批量大小调整为 1,以便它尝试消耗一些东西。

【问题讨论】:

  • 你能和控制台消费者一起消费吗?

标签: go apache-kafka kafka-consumer-api


【解决方案1】:

只是猜测: 尝试添加导入以加载压缩编解码器,以防您的主题使用压缩。

导入_“github.com/segmentio/kafka-go/snappy”

【讨论】:

    猜你喜欢
    • 2015-01-26
    • 1970-01-01
    • 2019-08-27
    • 2017-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-02
    • 2020-03-11
    相关资源
    最近更新 更多