【发布时间】:2019-03-26 12:44:42
【问题描述】:
我是 Apache Beam 的新手,我正在尝试编写一个管道以从 Google BigQuery 中提取数据并使用 Python 以 CSV 格式将数据写入 GCS。
使用 beam.io.read(beam.io.BigQuerySource()) 我可以从 BigQuery 读取数据,但不知道如何以 CSV 格式将其写入 GCS。
是否有自定义功能可以实现相同的功能,您能帮帮我吗?
import logging
import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE
PROJECT='project_id'
BUCKET='project_bucket'
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=readwritebq',
'--save_main_session',
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
with beam.Pipeline(argv=argv) as p:
# Execute the SQL in big query and store the result data set into given Destination big query table.
BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query = 'Select * from `dataset.table`', use_standard_sql=True))
# Extract data from Bigquery to GCS in CSV format.
# This is where I need your help
BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
table='tablename',
dataset='datasetname',
project='project_id',
schema='name:string,gender:string,count:integer',
create_disposition=CREATE_IF_NEEDED,
write_disposition=WRITE_TRUNCATE)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
【问题讨论】:
-
欢迎来到Stack Overflow!请拨打tour 并访问help center 以充分利用本网站。还请分享您迄今为止开发的relevant parts of the code。这有助于找出问题所在。
标签: python google-bigquery google-cloud-dataflow apache-beam