【问题标题】:unable to read avro message via kafka-avro-console-consumer (end goal read it via spark streaming)无法通过 kafka-avro-console-consumer 读取 avro 消息(最终目标通过 spark 流读取它)
【发布时间】:2020-09-10 11:26:21
【问题描述】:

最终目标)在尝试我是否最终可以从 Confluent 平台中读取 avro 数据、usng 火花流,如下所述:Integrating Spark Structured Streaming with the Confluent Schema Registry

我想验证是否可以使用以下命令来阅读它们:

$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081

我收到此错误消息未知魔法字节

Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

注意,消息可以这样读取(使用控制台消费者而不是 avro-console-消费者):

kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml

消息是使用 confluent connect file-pulse (1.5.2) 读取 xml 文件 (streamthoughts/kafka-connect-file-pulse) 生成的

请在此处提供帮助: 我用错了kafka-avro-console-consumer 吗? 我尝试了此处描述的“反序列化器”属性选项:https://stackoverflow.com/a/57703102/4582240,没有帮助

我还不想勇敢地启动火花流来读取数据。

我使用的 file-pulse 1.5.2 属性如下所示添加 11/09/2020 以完成。

name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1

# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker

fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name
offset.strategy=name

【问题讨论】:

  • 主题是否有关联的 avro 架构?

标签: apache-kafka confluent-platform


【解决方案1】:

如果您从消费者那里获得 Unknown Magic Byte,那么生产者没有使用 Confluent AvroSerializer,并且可能推送了不使用 Schema Registry 的 Avro 数据。

没有看到生产者代码或消费和检查二进制格式的数据,很难知道是哪种情况。

消息是使用 confluent connect file-pulse 生成的

您是否在 AvroConverter 类中使用了 value.converter

【讨论】:

  • 嗨@OneCriketeer,我没有明确设置。我只是使用了默认的属性文件stackoverflow.com/questions/63796593/… myquestion 有我使用的内容。我会看看我能不能把我的最后一个贴在这里。
  • 我现在添加了原始问题中使用的属性。 @OneCricketeer。另请注意:我使用 kafka-console-consumer 消费/阅读消息没有问题。我确实对avro-console 消费者有意见。
  • 嗨@OneCricketeer,当你说“你使用value.converter”时,你的意思是在文件脉冲属性中吗?或者你的意思是当我通过 avro 控制台阅读时?无论哪种情况,我都没有使用它。我假设连接器默认使用它。
  • 在 OneCriketeer 的提示之后。我发现这篇文章真的很有帮助:confluent.io/blog/…
  • @Minnie connect-standalone.properties 中的默认值是 JSONConverter,我相信。而且它们不应该为空,因为那时 Connect 不知道如何序列化数据
猜你喜欢
  • 2021-12-28
  • 2019-12-31
  • 2016-10-08
  • 2018-01-12
  • 2021-04-13
  • 1970-01-01
  • 2022-11-25
  • 2020-10-12
  • 2017-04-04
相关资源
最近更新 更多