【问题标题】:Kafka Logstash Avro integration failingKafka Logstash Avro 集成失败
【发布时间】:2019-12-05 16:12:17
【问题描述】:

我正在尝试使用 Logstash 中的 Avro Deserializer 使用来自 Kafka 的主题并收到以下错误。

这是我的 Logstash 配置文件输入

    input {
      kafka {
       bootstrap_servers => "kafka1:9911,kafka2:9911,kafka3.com:9911"
       topics => "EMS.Elastic_new"
       auto_offset_reset => earliest
       group_id => "logstash106"
      ssl_truststore_location => "/apps/opt/application/elasticsearch/logstash-7.1.1/kafka_files/kafka.client.truststore.jks"
      ssl_truststore_password => "xxxx"
      security_protocol => "SSL"
      key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      codec => avro_schema_registry {
        endpoint => "https://kafka1:9990" 
        subject_name => "EMS.Elastic_new"
        schema_id => 170
        schema_uri => "/apps/opt/application/elasticsearch/logstash-7.1.1/kafka_files/ticketInfo.avsc"
        tag_on_failure => true
        register_schema => true
      }
    }
  }
output {
     elasticsearch {
       index => "smd_etms_es2"
       document_id => "%{tktnum}%"
       action => "update"
       doc_as_upsert => "true"
       retry_on_conflict => 5
       hosts => ["npes1:9200"]
  }
       stdout { codec => rubydebug }
}


[ERROR][logstash.inputs.kafka ]
Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: 
Failed to construct kafka consumer, :cause=>io.confluent.common.config.
ConfigException: Missing required configuration "schema.registry.url" 
which has no default value.} [2019-07-26T16:58:22,736][ERROR][logstash.javapipeline ]
A plugin had an unrecoverable error. Will restart this plugin. Pipeline_id:main

我在编解码器中提供了 avro_uri,但是 logstash 没有读取设置。

【问题讨论】:

  • 这是我的logstash.conf
  • 请找到附加的logstash.conf。
  • 另外,这里是我添加到 logstash 的 jar 列表:kafka-avro-serializer-5.0.0.jar kafka-clients-2.0.0.jar common-config-5.0.0 .jar common-utils-5.0.0.jar kafka-schema-registry-client-5.0.0.jar avro-1.8.2.jar avro-maven-plugin-1.8.2.jar avro-compiler-1.8.2。 jar jackson-mapper-asl-1.9.13.jar jackson-core-asl-1.9.13.jar
  • 尝试将您的序列化程序设置为仅字节数组的github.com/revpoint/logstash-codec-avro_schema_registry/blob/…

标签: apache-kafka logstash avro


【解决方案1】:

缺少必需的配置“schema.registry.url”

来自设定

  key_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
  value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"

基于example code,它似乎希望您只对两者都使用org.apache.kafka.common.serialization.ByteArraySerializer,然后我假设avro_codec 使用endpoint 参数自行进行架构管理

【讨论】:

    猜你喜欢
    • 2016-03-09
    • 1970-01-01
    • 2016-01-16
    • 1970-01-01
    • 1970-01-01
    • 2021-01-10
    • 2021-06-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多