【发布时间】:2019-11-18 16:30:21
【问题描述】:
我正在将 BigQuery 中的表读入 Spark。该表具有以下列:id(int)、string1(string)、string2(string)、string3(string)、date(date)、timestamp(int)。现在我想根据它们的 id 将生成的数据帧的行写入 BigQuery 中的分隔表。例如,有 700 行 id 为 1,我想创建一个表 project.dataset.id1,并将所有这 700 行(所有列)写入该表,然后所有 id 为 2 的行都应该去一个表project.dataset.id2 等等。
如果我正在写入文本文件,我将使用 write.partitionBy('id'),那么在写入 BigQuery 时我可以做什么?
我尝试使用 python 来解决它,但是速度很慢,我想知道是否有更好的方法。
这是我迄今为止尝试过的:
#!/usr/bin/python
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from google.cloud import bigquery
sc = SparkContext().getOrCreate()
sc.setLogLevel("ERROR")
spark = SparkSession\
.builder\
.appName("example-spark")\
.getOrCreate()
df = spark.read.format('bigquery').option('table', 'project.dataset.input_table').load()
ids = df.select("id").distinct().rdd.flatMap(lambda x: x).collect()
dfs = [df.where(df["id"] == id) for id in ids]
client = bigquery.Client()
dataset_ref = client.dataset('test2')
for df in dfs:
id = str(df.select("id").distinct().rdd.flatMap(lambda x: x).collect()[0])
table_name = "source_table_%s" % (id)
table_ref = dataset_ref.table(table_name)
schema = [
bigquery.SchemaField('source_id', 'INT64', mode='REQUIRED'),
bigquery.SchemaField('string1', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('string2', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('string3', 'STRING', mode='NULLABLE'),
bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
bigquery.SchemaField('timestamp', 'INT64', mode='NULLABLE')
]
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print('table {} created.'.format(table.table_id))
table_path = "project.dataset.%s" % (table_name)
df.write.format('bigquery').option('table', table_path).mode("overwrite").save()
我正在考虑按 id 分组/分区,然后使用用于 BigQuery 的 python API 为每个组创建一个以它的 id 命名的新表,然后使用 .write.format('bigquery') 将行写入该表。
我可以使用什么函数按 id 将我的数据帧“拆分”为多个部分,然后遍历每个部分以创建表并执行写入?
另外,应用该函数后我将使用什么数据类型(如何访问 id 来命名表)?
【问题讨论】:
标签: python pyspark google-bigquery