【问题标题】:How to extract the query result from a Hive job output logs using DataprocHiveOperator?如何使用 DataprocHiveOperator 从 Hive 作业输出日志中提取查询结果?
【发布时间】:2020-01-05 22:39:30
【问题描述】:

我正在尝试使用 Airflow 构建数据迁移管道,源是 Dataproc 集群上的 Hive 表,目标是 BigQuery。我正在使用 DataprocHiveOperator 来获取架构以及来自源的数据。此运算符在内部使用 Dataproc REST API 在我们指定的 Dataproc 集群上提交和执行作业。输出将作为作业日志的一部分写入文件 Google 云存储。我只需要这些日志中的查询结果。

到目前为止,我已经修改了 gcp_dataproc_hook.py 代码,通过 driverOutputResourceUri 参数将输出文件的内容下载为字符串,从而将输出返回给调用方法。此输出的返回类型是 Pandas 数据框(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。

这是我在 gcp_dataproc_hook.py 中添加的代码 sn-p 以返回已提交查询的输出日志:

    #download the output
    def getOutput(self,project, output_bucket,output_path):
        client = storage.Client(project=self.project_id)
        bucket = client.get_bucket(output_bucket)
        output_blob = ('/'.join(output_path)+"."+"000000000")
        return bucket.blob(output_blob).download_as_string()

    #get logs including query output
    def getQueryResult(self):
        result=self.job_ouput
        output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
        df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
        return df

这是我尝试执行的示例查询:

SHOW CREATE TABLE my_tbl;

输出日志如下所示:

Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0            Connected to: Apache Hive (version 2.3.5)             
1                    Driver: Hive JDBC (version 2.3.5)             
2    Transaction isolation: TRANSACTION_REPEATABLE_...             
3    . . . . . . . . . . . . . . . . . . . . . . .>...             
4    |                   createtab_stmt            ...             
5    +---------------------------------------------...             
6    | CREATE TABLE `my_tbl`(       ...             
7    |   `col1` string,            ...             
8    |   `col2` bigint,                   ...             
9    |   `col3` string,                 ...                         
..                                                 ...                         
141  |   `coln` string)                 ...             
142  | ROW FORMAT SERDE                            ...             
143  |   'org.apache.hadoop.hive.ql.io.orc.OrcSerde...             
144  | STORED AS INPUTFORMAT                       ...             
145  |   'org.apache.hadoop.hive.ql.io.orc.OrcInput...             
146  | OUTPUTFORMAT                                ...             
147  |   'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...             
148  | LOCATION                                    ...             
149  |   'gs://my_hive_data_bucket/tmp/base_table/my_tbl...             
150  | TBLPROPERTIES (                             ...             
151  |   'transient_lastDdlTime'='1566842329')     ...             
152  +---------------------------------------------...             
153                  143 rows selected (0.154 seconds)             
154               Beeline version 2.3.5 by Apache Hive             
155  Closing: 0: jdbc:hive2://prod-metastore-test-c... 

预期的输出应该是这样的:

CREATE TABLE `my_tbl`(
  `col1` string,
  `col2` bigint,
  `col3` string,
  ..
  `coln` string,  
)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
  'transient_lastDdlTime'='1566842329')

请建议我一种可以接近解决方案的方法。

【问题讨论】:

    标签: python pandas hive airflow google-cloud-dataproc


    【解决方案1】:

    在 Dataproc 中,Hive 查询使用 Beeline 而不是已弃用的 Hive CLI,这就是默认情况下格式不同的原因。 Beeline 通常会以花哨的边框格式格式化人类可读的输出,而不是更容易解析的格式。

    幸运的是,有一些直线选项可以使格式非常接近旧 Hive CLI 所做的。您可以简单地创建一个初始化操作,在创建 Dataproc 集群时添加到选项中,并在您的 Airflow 运算符中指定 init_actions_uris。创建一个包含以下内容的文件:

    #!/bin/bash
    
    sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline
    

    然后将该文件上传到 GCS,例如 gs://some-gcs-bucket/beeline-legacyfmt.sh,并将该 GCS URI 设置为 Dataproc 集群的初始化操作。默认情况下,这将应用直线所需的命令行选项。然后,您发送的任何 Dataproc Hive 作业现在将以“tsv2”和“静默”模式输出,这意味着没有多余的日志语句,并且输出将是原始 tsv。

    【讨论】:

    • 抱歉延迟回复。我试过了,发现它有效。谢谢你,丹尼斯 :)
    • 嗨@丹尼斯。我希望你做的好。几个月后我想再次做这个练习。但没有得到相同的结果。即使使用了静默标志,日志也会显示在查询结果中。它在直线 CLI 中运行:beeline -u jdbc:hive2://localhost:10000 --outputformat=tsv2 --silent=true -e "select * from my_tbl limit 5" 但当我向 Dataproc 集群提交作业时却没有。我正在使用您建议的相同命令: sudo sed -i 's/beeline/beeline --outputformat=tsv2 --silent=true/' /usr/bin/beeline 你能尽快帮助我吗?谢谢。
    • 嗯,我想知道次要版本是否发生了意外变化——截至 9 月 18 日,默认版本应该是 1.3.43-debian9,而当前默认版本是 1.3.45-debian9。如果您能够重试,也许可以尝试使用--image-version=1.3.43-debian9 创建集群,看看它是否像以前那样工作?
    • 感谢丹尼斯的快速回复。我注意到我正在使用自定义图像。我将其更改为基本的 Debian 映像(1.4-debian9),它工作正常。如果您知道的话,如果您可以指导我在自定义操作系统映像上覆盖这些参数,那将会很有帮助。感谢您的响应:)
    猜你喜欢
    • 2018-01-16
    • 1970-01-01
    • 1970-01-01
    • 2011-04-05
    • 2020-08-04
    • 1970-01-01
    • 2019-02-10
    • 2016-02-17
    • 1970-01-01
    相关资源
    最近更新 更多