【发布时间】:2020-01-05 22:39:30
【问题描述】:
我正在尝试使用 Airflow 构建数据迁移管道,源是 Dataproc 集群上的 Hive 表,目标是 BigQuery。我正在使用 DataprocHiveOperator 来获取架构以及来自源的数据。此运算符在内部使用 Dataproc REST API 在我们指定的 Dataproc 集群上提交和执行作业。输出将作为作业日志的一部分写入文件 Google 云存储。我只需要这些日志中的查询结果。
到目前为止,我已经修改了 gcp_dataproc_hook.py 代码,通过 driverOutputResourceUri 参数将输出文件的内容下载为字符串,从而将输出返回给调用方法。此输出的返回类型是 Pandas 数据框(可以根据我们的方便更改为任何其他类型)。但这包括完整的日志。我必须从中提取查询结果。
这是我在 gcp_dataproc_hook.py 中添加的代码 sn-p 以返回已提交查询的输出日志:
#download the output
def getOutput(self,project, output_bucket,output_path):
client = storage.Client(project=self.project_id)
bucket = client.get_bucket(output_bucket)
output_blob = ('/'.join(output_path)+"."+"000000000")
return bucket.blob(output_blob).download_as_string()
#get logs including query output
def getQueryResult(self):
result=self.job_ouput
output = self.getOutput(result['reference']['projectId'],result['driverOutputResourceUri'].split('/')[2],result['driverOutputResourceUri'].split('/')[3:])
df = pd.read_csv(io.BytesIO(output), sep='\n|', nrows=500000, engine='python')
return df
这是我尝试执行的示例查询:
SHOW CREATE TABLE my_tbl;
输出日志如下所示:
Connecting to jdbc:hive2://prod-metastore-test-cluster1-m:10000
0 Connected to: Apache Hive (version 2.3.5)
1 Driver: Hive JDBC (version 2.3.5)
2 Transaction isolation: TRANSACTION_REPEATABLE_...
3 . . . . . . . . . . . . . . . . . . . . . . .>...
4 | createtab_stmt ...
5 +---------------------------------------------...
6 | CREATE TABLE `my_tbl`( ...
7 | `col1` string, ...
8 | `col2` bigint, ...
9 | `col3` string, ...
.. ...
141 | `coln` string) ...
142 | ROW FORMAT SERDE ...
143 | 'org.apache.hadoop.hive.ql.io.orc.OrcSerde...
144 | STORED AS INPUTFORMAT ...
145 | 'org.apache.hadoop.hive.ql.io.orc.OrcInput...
146 | OUTPUTFORMAT ...
147 | 'org.apache.hadoop.hive.ql.io.orc.OrcOutpu...
148 | LOCATION ...
149 | 'gs://my_hive_data_bucket/tmp/base_table/my_tbl...
150 | TBLPROPERTIES ( ...
151 | 'transient_lastDdlTime'='1566842329') ...
152 +---------------------------------------------...
153 143 rows selected (0.154 seconds)
154 Beeline version 2.3.5 by Apache Hive
155 Closing: 0: jdbc:hive2://prod-metastore-test-c...
预期的输出应该是这样的:
CREATE TABLE `my_tbl`(
`col1` string,
`col2` bigint,
`col3` string,
..
`coln` string,
)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'gs://my_hive_data_bucket/tmp/base_table/my_tbl'
TBLPROPERTIES (
'transient_lastDdlTime'='1566842329')
请建议我一种可以接近解决方案的方法。
【问题讨论】:
标签: python pandas hive airflow google-cloud-dataproc