【问题标题】:How to use Kafka Connect to deliver Protobuf messages to Elasticsearch?如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?
【发布时间】:2021-09-19 07:03:19
【问题描述】:

我正在使用这个 confluent protobuf 示例:

https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Protobuf 根据此 Protobuf 文件生成“Term”类型的消息:

syntax = "proto3";
package confluent.kafka.examples.protobuf;
message Section {
  string Id = 1;
  string SectionName = 2;
}
message Term {
  string TermId = 1;
  string TermName = 2;
  repeated Section Section = 3;
}

这些消息需要找到通往 Elasticsearch 的途径,所以我添加了这个 ES 连接器:

PUT  localhost:8083/connectors/protobuftopic/config
    {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "type.name": "_doc",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "key.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "protobuftopic",
      "value.converter.schemas.enable": "true",
      "name": "protobuftopic”,
      "connection.url": "http://elasticsearch:9200",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.ignore": "true",
      "schema.ignore": "true"
    }

现在我们有一个正在运行的连接器:

localhost:8083/connectors/protobuftopic/status
{
  "name": "protobuftopic",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "kafka-connect:8083"
    }
  ],
  "type": "sink"
}

然后我生成一些消息并使用 KSQL 打印它们:

ksql> print protobuftopic from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2021/07/08 19:05:08.436 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section { Id: "0" SectionName: "Section-123" }, partition: 0
rowtime: 2021/07/08 19:05:08.914 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section { Id: "1" SectionName: "Section-123" }, partition: 0
rowtime: 2021/07/08 19:05:08.923 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section { Id: "2" SectionName: "Section-123" }, partition: 0
rowtime: 2021/07/08 19:05:08.932 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section { Id: "3" SectionName: "Section-123" }, partition: 0
rowtime: 2021/07/08 19:05:08.942 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section { Id: "4" SectionName: "Section-123" }, partition: 0

当我检查我的 ES 连接器的状态时,我看到以下内容:

    {
      "name": "protobuftopic",
      "connector": {
        "state": "RUNNING",
        "worker_id": "kafka-connect:8083"
      },
      "tasks": [
        {
          "id": 0,
          "state": "FAILED",
          "worker_id": "kafka-connect:8083",
          "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic protobuftopic to Protobuf: \n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:131)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1\n


Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
        }
      ],
      "type": "sink"
    }

我在这里做错了什么?

【问题讨论】:

    标签: apache-kafka-connect confluent-platform confluent-schema-registry


    【解决方案1】:

    在这种特殊情况下,“未知魔术字节”错误的原因是没有明确的“密钥转换器”定义。连接器采用 Protobuf 并且失败了。通过指定解决了这个问题

    "key.converter": "org.apache.kafka.connect.storage.StringConverter
    

    所以整个连接器配置如下所示:

    {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter.schemas.enable": "true",
        "tasks.max": "1",
        "topics": "protobuftopic",
        "value.converter.schemas.enable": "true",
        "name": "protobuftopic",
        "connection.url": "http://elasticsearch:9200",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "key.ignore": "true",
        "schema.ignore": "true",
        "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true
       
    }
    

    【讨论】:

      猜你喜欢
      • 2019-06-11
      • 1970-01-01
      • 1970-01-01
      • 2019-05-24
      • 2020-06-17
      • 2021-01-30
      • 1970-01-01
      • 2023-02-17
      • 1970-01-01
      相关资源
      最近更新 更多