【问题标题】:Apache Beam Python on FlinkRunner Failing with java.io.FileNotFoundExceptionFlinkRunner 上的 Apache Beam Python 因 java.io.FileNotFoundException 而失败
【发布时间】:2020-08-20 08:02:40
【问题描述】:

我是 Apache Beam 的新手,我创建了一个简单的 python pipeline,我使用以下命令运行:

python scripts/main.py \
       --runner FlinkRunner \
       --flink_master localhost:8081 \
       --setup_file scripts/setup.py \
       --environment_type EXTERNAL \
       --environment_config localhost:50000 \
       --database postgres \
       --input /dataset/league_of_legends.csv \
       --database_host db \
       --table_name league_game_board \
       --database_user postgres \
       --database_password postgres

技术细节:

  • Apache Flink 版本:1.10.1
  • Kubernetes 版本:1.18
  • Python:3.8.5
  • Minikube:1.12.3
  • Apache Beam:v1.23
  • Apache 光束 python SDK:3.7

我已经在 minikube 上设置了一个 apache Flink 集群,然后我将其移植到 Jobmanager 以便当我运行上述脚本时它会提交作业。我使用kubectl cp 将数据集复制到容器中并执行到它们中,以确保在正确的位置找到数据。一切似乎都很顺利,直到 Job 到达 Taskmanager。任务管理器将作业提交到 Apache Beam 服务器,但随后失败并在日志中显示以下输出:

java.io.FileNotFoundException: /var/folders/qv/ztv4pp7n4r1gv38m2pj94l_00000gn/T/beam-tempabvjrxov/artifactsal8uquz5/9a7e79a7285955a56f5ab55fa5eb522eeb5a7bfcbbfe616a6b0bef5314a21ee8/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.open0(Native Method)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.open(FileInputStream.java:195)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.io.FileInputStream.<init>(FileInputStream.java:138)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
flink-taskmanager-585fc984d8-hhzg2 taskmanager  at java.lang.Thread.run(Thread.java:748)

Beam 作业服务器然后失败并出现类似错误:

flink-taskmanager-585fc984d8-hhzg2 beamserver 2020/08/20 06:19:56 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver   caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver   caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver   caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver   caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc =

我的 minikube Flink 集群的设置可以在 here 找到。

我已经搜索了对此/tmp/staged/workflow.tar.gz1-ref_Environment_default_e-workflow.tar.gz 的任何引用,但似乎找不到任何有关如何将其提供给我的设置的参考。

这里的问题是这个 tarball 是什么以及如何将它提供给我的 Flink 集群

【问题讨论】:

  • 您可以尝试删除标志 --environment_type EXTERNAL --environment_config localhost:50000 吗?
  • 所以这是我的原始设置,然后 --environment_type 默认为 DOCKER,当我在 Kubernetes 中运行时,这需要在 docker setup 中使用 docker,我遇到了这个问题 issues.apache.org/jira/browse/BEAM-6020,目前尚未解决

标签: apache-flink apache-beam


【解决方案1】:

workflow tarball 是 setup.py 创建的文件

 import setuptools
 setuptools.setup(
    name='PACKAGE-NAME',
    version='PACKAGE-VERSION',
    install_requires=[],
    packages=setuptools.find_packages(),
 )

没有它,您的代码只有在它自己包含在一个文件中且没有任何其他包含代码的文件夹中时才能工作。 您提供 --setup_file 参数和 setup.py 的路径。 警告:setup.py 必须命名为 setup.py。无法提供您自己的姓名。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-11
    • 1970-01-01
    • 2020-01-17
    • 1970-01-01
    • 2014-09-10
    相关资源
    最近更新 更多