【发布时间】:2019-07-10 23:57:30
【问题描述】:
我需要有关我想以 parquet 格式(使用每日分区程序)放入 HDFS 的 kafka 主题的帮助。
我在 kafka 主题中有很多数据,基本上是这样的 json 数据:
{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
这个话题的名字是:test
我想将这些数据以 parquet 格式放入我的 HDFS 集群中。 但我对接收器连接器配置感到困惑。 为此,我使用了 confluent hdfs-sink-connector。
这是我到目前为止所做的事情:
{
"name": "hdfs-sink",
"config": {
"name": "hdfs-sink",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test",
"hdfs.url": "hdfs://hdfs-IP:8020",
"hadoop.home": "/user/test-user/TEST",
"flush.size": "3",
"locale": "fr-fr",
"timezone": "UTC",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"consumer.auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
关于为什么我这样配置连接器的一些解释:
- 我每天都有很多这样的数据填充我的主题
- 最终目标是每天在我的 HDFS 中为此主题创建一个 parquet 文件
我知道也许我必须使用模式注册表将数据格式化为镶木地板,但我不知道该怎么做。有必要吗?
你能帮帮我吗?
谢谢
【问题讨论】:
标签: json apache-kafka hdfs parquet apache-kafka-connect