【问题标题】:dataflow HDF5 loading pipeline errors数据流 HDF5 加载管道错误
【发布时间】:2018-02-13 18:55:09
【问题描述】:

任何帮助将不胜感激!!!

我使用数据流来处理 H5 (HDF5 format) 文件。

为此,我创建了一个 setup.py 文件,该文件基于 juliaset 示例,该示例在其他票证之一中引用。我唯一的改变是要安装的软件包列表:

REQUIRED_PACKAGES = [
    'numpy',
    'h5py',
    'pandas',
    'tables',
    ]

管道如下:

import numpy as np
import h5py
import pandas as pd
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())

class H5Preprocess(beam.DoFn):
    def process(self, element):
        logging.info('**********starting to read H5')
        h5py.File(element, 'r')
        logging.info('**********finished reading H5')
        expression = hdf['/data/']['expression']
        logging.info('**********finished reading the expression node')
        np_expression = expression[1:2,1:2]
        logging.info('**********subset the expression to numpy 2x2')
        yield (element, np_expression)

def run(argv=None):
    pipeline_options = PipelineOptions(argv)
    parser = argparse.ArgumentParser(description="read from h5 blog and write to file")
    #parser.add_argument('--input',help='Input for the pipeline', default='gs://archs4/human_matrix.h5')
    #parser.add_argument('--output',help='output for the pipeline',default='gs://archs4/output.txt')
    #known_args, pipeline_args = parser.parse_known_args(argv)
    logging.info('**********finish with the parser')

    # what does the args is relevant for? when the parameters are known_args.input and known_args.output
    #with beam.Pipeline(options=PipelineOptions(argv=pipeline_args)) as p:
    with beam.Pipeline(options=pipeline_options) as p:
            (p 
                | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5']) 
                | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs()) 
                | 'pre-process' >> beam.ParDo(H5Preprocess()) 
                | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt')
            )
    p.run()       

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

执行命令如下:

python beam_try1.py --job-name beam-try1 --project orielresearch-188115 --runner DataflowRunner --setup_file ./setup.py --temp_location=gs://archs4/tmp --staging_location gs://archs4/staging 

管道错误如下:

(5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2): S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write failed., (7b4a7abb1a692d12): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f

您能否建议需要解决的问题?

谢谢, 爱拉兰

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    您是否尝试在本地运行器中运行数据子集?这可能会为您提供有关问题所在的更多信息。

    【讨论】:

    • 是的。我试过了 。我现在在 datalab 上运行代码。 CGS 正在打开文件并且 h5py.File 必须具有完整路径名而不是对象。我决定将 H5py 和梁结合起来。第一步是将数据从 H5py 提取到 numpy 数组或数据帧。第一步工作正常。是否可以将 numpy 数组传递给 Pipeline 以创建 PCollection,或者将其写入 BigQuery 或 GCS 的最佳方式是什么。该文件很长,从 datalab 写入不可靠,并且将永远花费。谢谢,艾拉
    • 据我所知,您不能将 numpy 数组按原样传递给 Pipeline。根据本文档 (cloud.google.com/dataflow/model/…),BigQueryIO 或 DatastoreIO 是创建有界 PCollection 的有效数据源。
    猜你喜欢
    • 1970-01-01
    • 2021-11-27
    • 1970-01-01
    • 1970-01-01
    • 2019-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多