【问题标题】:Kafka producer using HiveStorageHandler使用 HiveStorageHandler 的 Kafka 生产者
【发布时间】:2019-04-27 04:34:57
【问题描述】:

我对 hive/hadoop 比较陌生

我正在阅读这个Hive Storage Handlers

现在我正在尝试编写 HiveStorageHandler 的自定义实现,用于使用 Hive 表查询消息并将消息推送到 Kafka。

我看到 HiveStorageHandler 的其他实现允许我们使用 hive 表在 NoSQL 数据库上查询和写入。

我正在尝试为 Kafka 复制它。我在上面找到了一个项目

HiveKa - query Kafka using Hive

在这里,他们尝试使用对 hive 表的查询从 Kafka 读取数据。我希望在表格上使用 insert 来写关于 kafka 的主题。

有人可以指导我吗?

【问题讨论】:

    标签: hadoop hive apache-kafka kafka-producer-api


    【解决方案1】:

    如果我理解正确,您想从 Hive 读取事件,然后推送到 Kafka。我没有存储处理程序的经验,但我宁愿建议编写适当的代码以生成到 Kafka,然后将这些事件提供给 Hadoop/Hive。

    在 Kafka 中有一个称为 Kafka Connect 的框架,它可以写入外部系统。Confluent 已经为 HDFS 编写了这样一个连接器,它通过在将文件写入 HDFS 时更新 Hive 元存储来提供 Hive 支持。

    在不编写存储处理程序的情况下,您可以尝试使用 JDBC 源连接器,或者使用 Spark/Flink 从 Hive 读取该数据并推送到 Kafka。

    不过,一般来说,Hadoop 是 CDC 事件的目的地,而不是它的生成源。主要是因为查询速度很慢...如果您想在插入时创建事件,通常需要进行一些表扫描,因此从 Cassandra/Hbase 生成事件可能是更好的选择

    【讨论】:

    • 我希望在配置单元表上使用插入语句写入 Kafka。我不打算查询 Kafka。
    • 我理解那部分。我想我只是想了解原因,因为您可以立即将插入事件正确地放入 Kafka,然后将它们放入 Hive,而不是反过来
    【解决方案2】:

    我想在表格上使用 insert 来写关于 kafka 的话题。

    这可以使用 Kafka HiveStorageHandler。以下是此功能可能的一般用例

    1. 查询 Kafka 主题
    2. 从 Kafka 主题查询数据并插入到 hive 托管/外部 表
    3. 从 Kafka 主题中查询数据并推送到其他 Kafka 主题中
    4. 从 Hive 外部/托管表中查询数据并推送到 Kafka 主题中

    您正在尝试执行第 3 个用例。

    首先为源和目标 Kafka 主题创建两个外部表。

    create external table if not exists source_topic_table
    (
    <fields>
    )
    STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
    TBLPROPERTIES (
    'kafka.topic'='source_topic_name',
    'kafka.bootstrap.servers'=''
    );
    
    
    create external table if not exists target_topic_table
    (
    <fields>
    )
    STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
    TBLPROPERTIES (
    'kafka.topic'='target_topic_name',
    'kafka.bootstrap.servers'=''
    );
    

    然后使用合并查询将数据插入目标Kafka主题

    merge into target_topic_table
    using (
    select
    <columns>,
    cast(null as binary) as `__key`,
    cast(null as int) as `__partition`,
    cast(-1 as bigint) as `__offset`,
    cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
    from source_topic_table
    ) sub
    on
    sub.column_name = target_topic_table.coulmn_name <Some condition>
    when not matched then insert values
    (
    <sub.columns>,
    sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
    );
    

    注意:

    1. 使用 Hive 外部非原生表

    2. 除了用户定义的有效负载模式之外,Kafka 存储处理程序还附加了 4 个附加列(__key、__partition、__offset、__timestmap),用户可以使用这些列来查询 Kafka 元数据字段

    3. 如果数据不是 csv 格式,用户必须设置 'kafka.serde.class' 表属性

    4. 用户还可以设置“kafka.write.semantic”表属性,允许 NONE、AT_LEAST_ONCE 或 EXACTLY_ONCE 值。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-23
      • 2019-10-31
      • 2016-01-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多