【问题标题】:Kafka topic data to HDFS parquet file using HDFS sink connector configuration issue使用 HDFS 接收器连接器配置问题的 Kafka 主题数据到 HDFS parquet 文件
【发布时间】:2019-07-10 23:57:30
【问题描述】:

我需要有关我想以 parquet 格式(使用每日分区程序)放入 HDFS 的 kafka 主题的帮助。

我在 kafka 主题中有很多数据,基本上是这样的 json 数据:

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

这个话题的名字是:test

我想将这些数据以 parquet 格式放入我的 HDFS 集群中。 但我对接收器连接器配置感到困惑。 为此,我使用了 confluent hdfs-sink-connector。

这是我到目前为止所做的事情:

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"

  }
}

关于为什么我这样配置连接器的一些解释:

  • 我每天都有很多这样的数据填充我的主题
  • 最终目标是每天在我的 HDFS 中为此主题创建一个 parquet 文件

我知道也许我必须使用模式注册表将数据格式化为镶木地板,但我不知道该怎么做。有必要吗?

你能帮帮我吗?

谢谢

【问题讨论】:

    标签: json apache-kafka hdfs parquet apache-kafka-connect


    【解决方案1】:

    我没有亲自使用过ParquetFormat,但是你的数据必须有一个schema,这意味着以下之一

    1. 您的数据是使用 Confluent Avro 序列化程序生成的
    2. 您的数据以 Protobuf 的形式生成,并且您将 Protobuf 转换器添加到您的 Connect 工作人员中
    3. You use Kafka Connect's special JSON format that includes a schema within your records

    基本上,它不能是“纯 JSON”。 IE。您目前拥有"value.converter.schemas.enable": "true",我猜您的连接器无法正常工作,因为您的记录不是上述格式。

    基本上,如果没有模式,JSON 解析器就不可能知道 Parquet 需要编写哪些“列”。


    而且 Daily Partitioner 每天不会创建一个文件,只会创建一个目录。每个flush.size 将获得一个文件,并且还有一个用于刷新文件的计划轮换间隔的配置。此外,每个 Kafka 分区将有一个文件。


    此外,"consumer.auto.offset.reset": "earliest", 仅适用于 connect-distribtued.properties 文件,不适用于每个连接器的基础,AFAIK。


    由于我没有亲自使用过ParquetFormat,所以我能给出的建议就这么多了,但我已经使用other tools like NiFi 来实现类似的目标,这样您就无需更改现有的 Kafka 生产者代码。


    或者,改用JSONFormat,但是,Hive 集成将无法自动工作,并且表必须是预定义的(无论如何这都需要您为您的主题提供架构)。


    另一个选项就是configure Hive to read from Kafka directly

    【讨论】:

    • 感谢您的回答,我设法使用 Spark Structured Streaming 完成我想做的事情。
    • @Yrah 嗨,你能说得更详细点吗,或者如果你能得到一些 spark 结构化流的代码 sn-ps,即使我正在尝试同样的方法
    • @andani 随意创建您自己的帖子,询问您的用例
    • @cricket_007 如果你仔细阅读评论,即使我面临同样的问题,你也会知道我的问题是什么,你的建议对我不起作用所以我问他
    • @andani 我建议你创建一个新帖子,因为在 cmets 中开始讨论不会给你一个完整的详细答案
    猜你喜欢
    • 2016-12-26
    • 2017-04-07
    • 2018-06-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-15
    • 1970-01-01
    • 2019-01-21
    相关资源
    最近更新 更多