【问题标题】:.partitionBy('id') when writing to BigQuery tables in PySpark.partitionBy('id') 在 PySpark 中写入 BigQuery 表时
【发布时间】:2019-11-18 16:30:21
【问题描述】:

我正在将 BigQuery 中的表读入 Spark。该表具有以下列:id(int)、string1(string)、string2(string)、string3(string)、date(date)、timestamp(int)。现在我想根据它们的 id 将生成的数据帧的行写入 BigQuery 中的分隔表。例如,有 700 行 id 为 1,我想创建一个表 project.dataset.id1,并将所有这 700 行(所有列)写入该表,然后所有 id 为 2 的行都应该去一个表project.dataset.id2 等等。

如果我正在写入文本文件,我将使用 write.partitionBy('id'),那么在写入 BigQuery 时我可以做什么?

我尝试使用 python 来解决它,但是速度很慢,我想知道是否有更好的方法。

这是我迄今为止尝试过的:

#!/usr/bin/python

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from google.cloud import bigquery

sc = SparkContext().getOrCreate()

sc.setLogLevel("ERROR")

spark = SparkSession\
    .builder\
    .appName("example-spark")\
    .getOrCreate()

df = spark.read.format('bigquery').option('table', 'project.dataset.input_table').load()

ids = df.select("id").distinct().rdd.flatMap(lambda x: x).collect()
dfs = [df.where(df["id"] == id) for id in ids]

client = bigquery.Client()
dataset_ref = client.dataset('test2')

for df in dfs:
    id = str(df.select("id").distinct().rdd.flatMap(lambda x: x).collect()[0])
    table_name = "source_table_%s" % (id)
    table_ref = dataset_ref.table(table_name)
    schema = [
                bigquery.SchemaField('source_id', 'INT64', mode='REQUIRED'),
                bigquery.SchemaField('string1', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('string2', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('string3', 'STRING', mode='NULLABLE'),
                bigquery.SchemaField('date', 'DATE', mode='NULLABLE'),
                bigquery.SchemaField('timestamp', 'INT64', mode='NULLABLE')
            ]
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)
    print('table {} created.'.format(table.table_id))
    table_path = "project.dataset.%s" % (table_name)
    df.write.format('bigquery').option('table', table_path).mode("overwrite").save()

我正在考虑按 id 分组/分区,然后使用用于 BigQuery 的 python API 为每个组创建一个以它的 id 命名的新表,然后使用 .write.format('bigquery') 将行写入该表。

我可以使用什么函数按 id 将我的数据帧“拆分”为多个部分,然后遍历每个部分以创建表并执行写入?

另外,应用该函数后我将使用什么数据类型(如何访问 id 来命名表)?

【问题讨论】:

    标签: python pyspark google-bigquery


    【解决方案1】:

    抱歉,这并不能直接回答问题,但我想提出一种不同的方法。您可以创建一个按 id 列分区的表,而不是在 BigQuery 中创建和管理大量表。

    阅读有关整数分区的更多信息: https://cloud.google.com/bigquery/docs/creating-integer-range-partitions

    设置完成后,您只需写入这个单一的表,BigQuery 就会为您进行分区。当您阅读时,使用过滤器阅读 FROM Table WHERE id = 12345 将取代阅读 FROM TableId12345。这通常更易于管理且更易于实施。

    【讨论】:

    • 嗨,我正在考虑,但出于以下原因决定反对:整数分区仍处于测试阶段,目前每个表的分区限制为 4000 个,即使他们允许更多分区对分区表的操作数量的限制。所以使用桌子可能很困难。相反,我打算在以后使用表时使用带有 _TABLE_SUFFIX 的通配符表来获得类似的效果。但我一定会关注整数分区,希望它很快适合生产。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-08-14
    • 2022-08-06
    • 2021-09-04
    • 1970-01-01
    • 2022-11-07
    • 2021-12-14
    • 1970-01-01
    相关资源
    最近更新 更多