有人已经提到了streaming data solution,但是如果您尝试移动大量日志数据而不是设置连续流,您可能希望采取使用异步加载作业的路线.
GCS library 在 Google App Engine 中使用时就像大多数 python 文件库一样,可以将要导入的文件存储在云存储桶中:
import cloudstorage as gcs
filePath = "/CloudStorageBucket/dir/dir/logs.json"
with gcs.open(filePath, "w") as f:
f.write(SomeLogData)
f.close()
您可以通过 API 创建加载作业,指示 Big Query 在 Cloud Storage 中加载 CSV 或换行符分隔的 JSON 文件列表:(注意:您需要 use oauth 2)
from apiclient.discovery import build
service = build("bigquery", "v2", http = oAuthedHttp)
job = {
"configuration": {
"load": {
"sourceUris": ["gs://CloudStorageBucket/dir/dir/logs.json"],
"schema": {
"files" : [
{"name": "Column1",
"type": "STRING"},
...
]
},
"destinationTable": {
"projectId": "Example-BigQuery-ProjectId",
"datasetId": "LogsDataset",
"tableId": "LogsTable"
},
"sourceFormat" : "NEWLINE_DELIMITED_JSON"
"createDisposition": "CREATE_IF_NEEDED"
}
}
}
response = service.jobs().insert(
projectId = "Example-BigQuery-ProjectId",
body = job
).execute()
如果您想设置其他属性,例如写入处置或跳过 CSV 文件中的行,您可以阅读有关如何创建 Big Query load jobs 的更多信息。您还可以查看other good examples 如何加载数据,包括命令行提示。
编辑:
回答您更具体的问题:
这是实用的解决方案吗?
是的。我们使用延迟任务将 Google App Engine 日志导出到 Cloud Storage 并导入到 BigQuery。有些人使用了map reduce jobs,但如果您不需要洗牌或减少,这可能是矫枉过正。
日志数据结构经常更改,这会导致错误
当插入到 BigQuery 时。我们将如何在 python 脚本中处理它?
除非您在消息到达大查询之前对其进行解析,否则这应该不是问题。更好的设计是将消息、时间戳、级别等移植到 Big Query,然后通过那里的查询对其进行消化。
以防万一,我们必须在特定时期重新运行日志数据。我们怎么能做到这一点?需要写python脚本吗?
流式传输数据不会为您提供备份,除非您自己在 BigQuery 中进行设置。使用我上面概述的方法会自动在 Google Cloud Storage 中为您提供备份,这是首选。
知道 BigQuery 是一个 OLAP 数据库,而不是事务性数据库,因此通常最好在每次添加更多日志数据时重建表,而不是尝试插入新数据。这是违反直觉的,但 BigQuery 就是为此而设计的,因为它一次是 can import 10,000 files / 1TB。将分页与作业写入配置一起使用,理论上您可以相当快速地导入数十万条记录。如果您不关心备份日志,则流式传输数据将是理想的选择。