【问题标题】:How to deploy Google Cloud Dataflow with connection to PostgreSQL (beam-nuggets) from Google Cloud Functions如何通过从 Google Cloud Functions 连接到 PostgreSQL(beam-nuggets)来部署 Google Cloud Dataflow
【发布时间】:2020-07-13 12:17:16
【问题描述】:

我正在尝试在 GCP 中创建 ETL,它将从 PostgreSQL 读取部分数据并将其以合适的形式放入 BigQuery。我能够执行从我的计算机部署数据流的任务,但我未能使其成为动态的,因此它将读取最后传输的记录并传输下一个 100。所以我想,我将从 Cloud Function 创建数据流。一切正常,读/写 BigQuery 就像一个魅力,但我坚持使用 PostgreSQL 所需的包:beam-nuggets。

在函数中我正在创建管道参数:

pipe_arguments = [    
    '--project={0}'.format(PROJECT),
    '--staging_location=gs://xxx.appspot.com/staging/',
    '--temp_location=gs://xxx.appspot.com/temp/',
    '--runner=DataflowRunner',
    '--region=europe-west4',
    '--setup_file=./setup.py'
    ]

    pipeline_options = PipelineOptions(pipe_arguments)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

然后创建管道:

 pipeline = beam.Pipeline(argv = pipe_arguments) 

并运行它:

pipeline.run()

如果我省略:

    '--setup_file=./setup.py'

一切都很好,除了 Dataflow 不能使用 PostgeQSL 作为导入:

from beam_nuggets.io import relational_db

失败。

当我添加时

    '--setup_file=./setup.py'

行,来自 GCP Function 门户网站的测试函数返回:

Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']' returned non-zero exit status 1.
,          output of the failed child process b'running sdist\nrunning egg_info\ncreating example.egg-info\n'

运行

python setup.py sdist --dist-dir ./tmp/

从本地计算机工作正常。

setup.py 与函数代码 (main.py) 和 requirements.txt 一起部署到 Cloud Function。

Requirements.txt 在函数部署期间使用,如下所示:

beam-nuggets==0.15.1
google-cloud-bigquery==1.17.1
apache-beam==2.19.0
google-cloud-dataflow==2.4.0
google-apitools==0.5.31

setup.py 看起来像这样:

from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = ['beam-nuggets>=0.15.1']

setup(
    name='example',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='example desc'
)

我被困了几天,尝试了不同的 setup.py 方法,尝试使用 requirements.txt 而不是 setup.py - 没有运气。

日志只是说:

 {
 insertId: "000000-88232bc6-6122-4ec8-a4f3-90e9775e89f6"  
 
labels: {
  execution_id: "78ml14shfolv"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T12:08:35.898729649Z"  
 
resource: {
  
labels: {
   function_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "INFO"  
 textPayload: "Executing command: ['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']"  
 timestamp: "2020-07-13T12:08:31.639Z"  
 trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"  
}
 {
 insertId: "000000-3dfb239a-4067-4f9d-bd5f-bae5174e9dc7"  
 
labels: {
  execution_id: "78ml14shfolv"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T12:08:35.898729649Z"  
 
resource: {
  
labels: {
   function_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "DEBUG"  
 textPayload: "Function execution took 7798 ms, finished with status: 'crash'"  
 timestamp: "2020-07-13T12:08:35.663674738Z"  
 trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"  
}

补充信息:

如果我正在使用

'--requirements_file=./requirements.txt'

而不是

'--setup_file=./setup.py'

我明白了:

Error: memory limit exceeded.

在运行测试功能时在 GCP Functions 门户网站中。

在我将内存增加到 2BG 之后,它说:

Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full traceback: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1. 
 Pip install failed for package: -r         
 Output from execution of subprocess: b'Collecting beam-nuggets==0.15.1  
 Downloading beam-nuggets-0.15.1.tar.gz (17 kB)
  Saved /tmp/dataflow-requirements-cache/beam-nuggets-0.15.1.tar.gz
Collecting google-cloud-bigquery==1.17.1
  Downloading google-cloud-bigquery-1.17.1.tar.gz (228 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.17.1.tar.gz
Collecting apache-beam==2.19.0
  Downloading apache-beam-2.19.0.zip (1.9 MB)
  Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip
Collecting google-cloud-dataflow==2.4.0
  Downloading google-cloud-dataflow-2.4.0.tar.gz (5.8 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-dataflow-2.4.0.tar.gz
Collecting google-apitools==0.5.31
  Downloading google-apitools-0.5.31.tar.gz (173 kB)
  Saved /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gz
Collecting SQLAlchemy<2.0.0,>=1.2.14
  Downloading SQLAlchemy-1.3.18.tar.gz (6.0 MB)
  Saved /tmp/dataflow-requirements-cache/SQLAlchemy-1.3.18.tar.gz
Collecting sqlalchemy-utils<0.34,>=0.33.11
  Downloading SQLAlchemy-Utils-0.33.11.tar.gz (128 kB)
  Saved /tmp/dataflow-requirements-cache/SQLAlchemy-Utils-0.33.11.tar.gz
Collecting pg8000<2.0.0,>=1.12.4
  Downloading pg8000-1.16.0.tar.gz (75 kB)
  Saved /tmp/dataflow-requirements-cache/pg8000-1.16.0.tar.gz
Collecting PyMySQL<2.0.0,>=0.9.3
  Downloading PyMySQL-0.9.3.tar.gz (75 kB)
  Saved /tmp/dataflow-requirements-cache/PyMySQL-0.9.3.tar.gz
Collecting kafka>===1.3.5
  Downloading kafka-1.3.5.tar.gz (227 kB)
  Saved /tmp/dataflow-requirements-cache/kafka-1.3.5.tar.gz
Collecting google-cloud-core<2.0dev,>=1.0.0
 Downloading google-cloud-core-1.3.0.tar.gz (32 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-core-1.3.0.tar.gz
Collecting google-resumable-media<0.5.0dev,>=0.3.1
  Downloading google-resumable-media-0.4.1.tar.gz (2.1 MB)
  Saved /tmp/dataflow-requirements-cache/google-resumable-media-0.4.1.tar.gz
Collecting protobuf>=3.6.0
  Downloading protobuf-3.12.2.tar.gz (265 kB)
  Saved /tmp/dataflow-requirements-cache/protobuf-3.12.2.tar.gz
Collecting crcmod<2.0,>=1.7
  Downloading crcmod-1.7.tar.gz (89 kB)
  Saved /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gz
Collecting dill<0.3.2,>=0.3.1.1
  Downloading dill-0.3.1.1.tar.gz (151 kB)
  Saved /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gz
Collecting fastavro<0.22,>=0.21.4
  Downloading fastavro-0.21.24.tar.gz (496 kB)
  Saved /tmp/dataflow-requirements-cache/fastavro-0.21.24.tar.gz
Collecting future<1.0.0,>=0.16.0
  Downloading future-0.18.2.tar.gz (829 kB)
  Saved /tmp/dataflow-requirements-cache/future-0.18.2.tar.gz
Collecting grpcio<2,>=1.12.1
  Downloading grpcio-1.30.0.tar.gz (19.7 MB)
    ERROR: Command errored out with exit status 1:
     command: /env/bin/python3.7 -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\
\'"\'"\', \'"\'"\'\
\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' egg_info --egg-base /tmp/pip-download-yjpzrbur/grpcio/pip-egg-info
         cwd: /tmp/pip-download-yjpzrbur/grpcio/
    Complete output (11 lines):
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 196, in <module>
        if check_linker_need_libatomic():
      File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 156, in check_linker_need_libatomic
        stderr=PIPE)
      File "/opt/python3.7/lib/python3.7/subprocess.py", line 800, in __init__
        restore_signals, start_new_session)
      File "/opt/python3.7/lib/python3.7/subprocess.py", line 1551, in _execute_child
        raise child_exception_type(errno_num, err_msg, err_filename)
    FileNotFoundError: [Errno 2] No such file or directory: \'cc\': \'cc\'
    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
WARNING: You are using pip version 20.0.2; however, version 20.1.1 is available.
You should consider upgrading via the \'/env/bin/python3.7 -m pip install --upgrade pip\' command.
'

这种情况下的日志:

 {
 insertId: "000000-5e4c10f4-d542-4631-8aaa-b9306d1390fd"  
 
labels: {
  execution_id: "15jww0sd8uyz"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T14:01:33.505683371Z"  
 
resource: {
  
labels: {
   function_name: xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "DEBUG"  
 textPayload: "Function execution took 18984 ms, finished with status: 'crash'"  
 timestamp: "2020-07-13T14:01:32.953194652Z"  
 trace: "projects/xxx/traces/262224a3d230cd9a66b1eebba3d7c3e0"  
}

从本地机器数据流部署工作正常。

来自日志的命令:

python -m pip download --dest ./tmp -r ./requirements.txt --exists-action i --no-binary :all:

尽管我将 requirements.txt 减少到仅 beam-nuggets==0.15.1 似乎需要几分钟下载一半的互联网,但也可以正常工作。

卡住了

grpcio-1.30.0.tar.gz (19.7 MB)

正是在从这个包设置过程中,函数:

def check_linker_need_libatomic():
    """Test if linker on system needs libatomic."""
    code_test = (b'#include <atomic>\n' +
                 b'int main() { return std::atomic<int64_t>{}; }')
    cc_test = subprocess.Popen(['cc', '-x', 'c++', '-std=c++11', '-'],
                               stdin=PIPE,
                               stdout=PIPE,
                               stderr=PIPE)
    cc_test.communicate(input=code_test)
    return cc_test.returncode != 0

【问题讨论】:

  • 绝对是一个彻底的解释 :) 问题似乎指向您的依赖项或缺少依赖项,您是否尝试过首先将您的依赖项下载到您的机器中,然后将它们全部加载到 zip 文件并查看您的 Cloud Function 是否部署成功?确保在部署之前准备好所有依赖项,以免以后遇到导入问题。
  • @sllopis Cloud Function 这里没有问题,部署成功。但是运行 Cloud Function 会部署 Datflow,并且此部署失败。但仅从 CLoud 功能,直接从我的 PC 部署 od Dataflow 工作正常。它是一种从 grpcio 包中省略运行 setup.py 的方法,即使它是我在代码中使用的 beam-nuggets 所需的包? python -m pip download --dest ./tmp -r ./requirements.txt --exists-action i --no-binary :all:被自动调用,如何强制pip使用本地文件?
  • 请注意,在本地运行管道与在云端运行管道不同。 documentation 还明确提到在 Cloud Functions 中有两种指定依赖项的方法:使用 pip 的包管理器的 requirements.txt 文件将本地依赖项与函数一起打包。现在鉴于 beam-nuggets 依赖于外部依赖项 grpc,您是否尝试将 grpc 直接包含在您的 requirements.txt 中 文件?会发生什么?
  • 我没有描述在本地运行它,这是完全不同的故事,我打算从本地机器或 GCP Cloud Functions 机器部署它(部署到 GCP Dataflow),但 '--runner=DataflowRunner', 是总是设置。如果这就是你的要求。

标签: python postgresql google-cloud-platform google-cloud-functions google-dataflow


【解决方案1】:

我还尝试了 GCP AppEngine 而不是 Cloud Functions,结果相同,但它引导我找到了正确的解决方案。感谢thisthis,我能够从beam-nuggets 创建外部包,并使用--extra_package 而不是--setup_file--setup_file 包含它。

grpcio 编译的问题(由不可配置的--no-binary', ':all:' 强制)仍然存在。 setup.py 奇怪错误的问题也仍然存在。

但是从 Cloud Functions 到 Dataflow(具有依赖项)的部署工作正常,所以问题对我来说已经解决了。

更新:

就在那之后我遇到了问题:

in _import_module return __import__(import_name) ModuleNotFoundError: No module named 'main'

因为我没有使用任何很难找到的“主”模块,所以我必须将 main.py 文件中定义的每个函数(因此模块名称)打包到外部包中。所以extra_package 文件包含所有外部依赖项和我自己的模块,其中存储了我的函数。

【讨论】:

  • “setup.py 奇怪错误的问题仍然存在。”:我的错误与您相同,但还有另一个日志显示“错误:无法创建'my_package.egg-info':阅读-only 文件系统”。 AppEngine 文件系统是只读的,所以可能对您来说是一样的。
猜你喜欢
  • 2019-07-11
  • 2017-08-18
  • 1970-01-01
  • 2018-05-18
  • 2021-07-18
  • 1970-01-01
  • 2018-10-12
  • 2020-12-31
  • 2020-11-03
相关资源
最近更新 更多