【发布时间】:2020-12-23 13:12:14
【问题描述】:
我想每小时设置一个管道来解析 GCS 存储桶不同文件夹中的 2000 个原始 protobuf 格式文件,并将数据加载到大查询中。到目前为止,我能够成功解析 proto 数据。
我知道读取文件夹中所有文件的通配符方法,但我现在不想这样做,因为我有来自不同文件夹的数据,我想像并行一样更快地运行它,而不是按顺序运行
如下图
for x,filename enumerate(file_separted_comma):
--read data from prto
--load data to bigquery
现在我想知道以下方法是否是从 apache 梁中的不同文件夹解析多个文件并将数据加载到大查询中的最佳或推荐方法。
还有一件事,从proto解析后的每条记录,我都把它变成JSON记录来加载到大查询中,不知道这也是将数据加载到大查询而不是直接加载反序列化的好方法(解析)原始数据。
我正在从 Hadoop 作业转移到数据流,以通过设置此管道来降低成本。
我是 apache-beam 的新手,不知道什么是优缺点,因此有人可以看看代码并在这里帮助我制定更好的生产方法
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems
def get_deserialized_log(serialized_log):
log = rtbtracker_log_pb2.RtbTrackerLogProto()
log.ParseFromString(serialized_log)
return log
def print_row(message):
message=message[3]
message = message.replace('_', '/');
message = message.replace('*', '=');
message = message.replace('-', '+');
#finalbunary=base64.b64decode(message.decode('UTF-8'))
finalbunary=base64.b64decode(message)
msg=get_deserialized_log(finalbunary)
jsonObj = MessageToDict(msg)
#jsonObj = MessageToJson(msg)
return jsonObj
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
return line
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=False)
parser.add_argument("--output", dest="output", required=False)
app_args, pipeline_args = parser. parse_known_args()
with beam.Pipeline(options=PipelineOptions()) as p:
input_list=app_args.input
file_list = input_list.split(",")
res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
for i,file in enumerate(file_list):
onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
#i want to load to bigquery here
##LOAD DATA TO BIGQUERY
#secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json",
###num_shards=1 ,
##append_trailing_newlines = True)
if __name__ == '__main__':
run()
在本地运行下面的代码
python3 another_main.py --input=tracker_one.gz,tracker_two.gz
我没有提到输出路径,因为我不想将数据保存到 gcs,因为我将把它加载到 bigquery 中
就像下面在 dataflowrunner 中运行的一样
python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing
【问题讨论】:
标签: python protocol-buffers google-cloud-dataflow apache-beam