【发布时间】:2020-05-25 02:31:33
【问题描述】:
我每天运行命令将新记录插入 BigQuery 表,并想记录每天插入的记录数。
我创建了一个 QueryJob 对象,其中包含一个 SELECT 查询和一个 destination 表。我将write_disposition 设置为WRITE_APPEND,以便将新数据追加到表中。
我找到了两个做类似事情的选项,但都没有达到我想要的效果:
-
query_job.num_dml_affected_rows:这只是返回 None,因为查询不使用 DMLINSERT,而是附加到目标表。 -
query_job.result().total_rows:这将返回表中的总行数,而不是新行数。
我可以想出多种方法来达到预期的结果,但不确定最好的方法是什么:
- 将查询更改为 DML 插入 - 但这意味着创建动态 SQL 而不仅仅是使用目标表是一个 Python 变量。
- 将结果转储到临时表中,计算行数,然后追加数据 - 但这似乎效率低下。
- 计算查询前后的记录并记录增量 - 这可能会导致并行运行查询出现问题。
有没有最好的方法的建议?
跟进马斯喀特的回答,我认为当查询并行运行时这将不起作用:
Get number of rows: 1000 rows
Function to call queries in paralell:
- Query 1 --> Adds 100 rows --> finishes 3rd --> Counts 1200 rows
- Query 2 --> Adds 80 rows --> finishes 2nd --> Counts 1100 rows
- Query 3 --> Adds 20 rows --> finishes 1st --> Counts 1020 rows
因为无法知道这些查询将完成哪个顺序(因为它们都是使用 multiprocessing 库并行调用的),所以我不确定如何知道每个查询添加了多少行?
更新 2
示例代码:
...
# We compile a list of which datasets need to be loaded from
brands = self._bq.select(f"Select brand, gaDataset From {self.BRAND_DATASET}.{self.BRAND_TABLE}")
brands = list(brands.iterrows())
_, brands = zip(*brands)
# Define the function for parallel population
def populate_fn(brand):
return self._populate(brand, self.predicates)
logging.info("Populating daily stats for brands in parallel")
error = self._parallel_apply(populate_fn, brands)
if error is not None:
return error
def _populate(self, brand, predicates):
# We can't just call <bq_load_data> because we need to update the predicates for each brand
predicates.add_predicate('gaDataset', brand['gaDataset'], operator="_")
query_job = self._load_data(self.table_name, predicates=predicates)
logging.info(f"Started for {brand['gaDataset']}: {brand['brand']}")
self._run_query_job(query_job)
logging.info(f"{brand['gaDataset']}: {brand['brand']} is now populated.")
_populate 函数针对每个品牌并行运行。
predicates 只是一个处理如何修改 Jinja 模板化 SQL 的对象,具有来自主对象的一些常用参数,以及一些特定于品牌的参数。
_load_data 是一个函数,它实际加载带有适当参数的 Jinja 模板化 SQL,并构造并返回一个 QueryJob 对象。
【问题讨论】: