【问题标题】:Cloud Dataflow to Join multiple filesCloud Dataflow 加入多个文件
【发布时间】:2018-11-06 15:50:05
【问题描述】:

我是 Google Cloud 的新手,我在 GCS 中有以下文件,需要设计一个数据流来合并文件并替换产品、位置文件中的值并将最终输出文件加载到 BigQuery。

  1. gs://testprojectxxxx/staging/actual_file.csv
  2. gs://testprojectxxxx/staging_timestamp/product.csv /location.csv

本地机器上的 Python 代码:

import pandas as pd

df1 = pd.read_csv("C:/Users/xxxx\\actual_file.csv")
df2 = pd.read_csv("C:/Users/xxxx_folder\\product.csv",header=None,names=['id', 'product_name'])
df3 = pd.merge(df1, df2, how='left', left_on='product_id', right_on='id')
df3.drop(['product_id_x', 'id'], axis=1,inplace=True)

df4 = pd.read_csv("C:/Users/xxxx_folder\\location.csv",header=None,names=['id', 'location_name'])
df5 = pd.merge(df3, df4, how='left', left_on='location_id', right_on='id')
df5.drop(['location_id_x', 'id'], axis=1,inplace=True)

df5.rename(columns={'product_name_y':'product_name','location_name_y':'location'}, inplace=True)
df5.to_csv('Final_file.csv', sep=',',encoding='utf-8', index=False)

感谢您的帮助。

【问题讨论】:

    标签: python google-cloud-platform google-cloud-dataflow google-cloud-datalab


    【解决方案1】:

    要加入这些行,您要使用 GroupByKeyCoGroupByKey

    https://beam.apache.org/releases/pydoc/2.8.0/apache_beam.transforms.core.html#apache_beam.transforms.core.GroupByKey

    查看文档https://beam.apache.org/documentation/programming-guide/#core-beam-transforms 中的第 4.2.3 节

    emails_list = [
        ('amy', 'amy@example.com'),
        ('carl', 'carl@example.com'),
        ('julia', 'julia@example.com'),
        ('carl', 'carl@email.com'),
    ]
    phones_list = [
        ('amy', '111-222-3333'),
        ('james', '222-333-4444'),
        ('amy', '333-444-5555'),
        ('carl', '444-555-6666'),
    ]
    
    emails = p | 'CreateEmails' >> beam.Create(emails_list)
    phones = p | 'CreatePhones' >> beam.Create(phones_list)
    
    # The result PCollection contains one key-value element for each key in the
    # input PCollections. The key of the pair will be the key from the input and
    # the value will be a dictionary with two entries: 'emails' - an iterable of
    # all values for the current key in the emails PCollection and 'phones': an
    # iterable of all values for the current key in the phones PCollection.
    results = ({'emails': emails, 'phones': phones}
               | beam.CoGroupByKey())
    
    def join_info(name_info):
      (name, info) = name_info
      return '%s; %s; %s' %\
          (name, sorted(info['emails']), sorted(info['phones']))
    
    contact_lines = results | beam.Map(join_info)
    

    【讨论】:

    • 感谢您的回复,最初将文件解压缩并复制到暂存桶的新文件夹中。我有 2 个问题 1. 如何在文件到达时触发数据流。 2.如何在数据流中设置路径来选择新文件。 parser = argparse.ArgumentParser() parser.add_argument('--input', dest='input', default='gs://dataflow-samples/', help='要处理的输入文件。')
    • 我不认为数据流能够“监听”新文件。理想情况下,上传文件的机制将启动这些特定文件的数据流。否则,您可以在应用引擎中设置一个 cron 作业以每 5 分钟启动一次并检查文件。如果 cron 作业找到文件,它应该将它们移动到不同的文件夹(为了区分新文件和旧文件),然后为这些路径启动数据流cloud.google.com/appengine/docs/standard/python/…
    猜你喜欢
    • 2019-03-04
    • 1970-01-01
    • 1970-01-01
    • 2018-10-15
    • 2015-07-02
    • 2019-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多