【发布时间】:2020-08-20 08:02:40
【问题描述】:
我是 Apache Beam 的新手,我创建了一个简单的 python pipeline,我使用以下命令运行:
python scripts/main.py \
--runner FlinkRunner \
--flink_master localhost:8081 \
--setup_file scripts/setup.py \
--environment_type EXTERNAL \
--environment_config localhost:50000 \
--database postgres \
--input /dataset/league_of_legends.csv \
--database_host db \
--table_name league_game_board \
--database_user postgres \
--database_password postgres
技术细节:
- Apache Flink 版本:1.10.1
- Kubernetes 版本:1.18
- Python:3.8.5
- Minikube:1.12.3
- Apache Beam:v1.23
- Apache 光束 python SDK:3.7
我已经在 minikube 上设置了一个 apache Flink 集群,然后我将其移植到 Jobmanager 以便当我运行上述脚本时它会提交作业。我使用kubectl cp 将数据集复制到容器中并执行到它们中,以确保在正确的位置找到数据。一切似乎都很顺利,直到 Job 到达 Taskmanager。任务管理器将作业提交到 Apache Beam 服务器,但随后失败并在日志中显示以下输出:
java.io.FileNotFoundException: /var/folders/qv/ztv4pp7n4r1gv38m2pj94l_00000gn/T/beam-tempabvjrxov/artifactsal8uquz5/9a7e79a7285955a56f5ab55fa5eb522eeb5a7bfcbbfe616a6b0bef5314a21ee8/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.open0(Native Method)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.open(FileInputStream.java:195)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.io.FileInputStream.<init>(FileInputStream.java:138)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:121)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:96)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
flink-taskmanager-585fc984d8-hhzg2 taskmanager at java.lang.Thread.run(Thread.java:748)
Beam 作业服务器然后失败并出现类似错误:
flink-taskmanager-585fc984d8-hhzg2 beamserver 2020/08/20 06:19:56 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
flink-taskmanager-585fc984d8-hhzg2 beamserver caused by:
flink-taskmanager-585fc984d8-hhzg2 beamserver rpc error: code = Unknown desc =
我的 minikube Flink 集群的设置可以在 here 找到。
我已经搜索了对此/tmp/staged/workflow.tar.gz 和1-ref_Environment_default_e-workflow.tar.gz 的任何引用,但似乎找不到任何有关如何将其提供给我的设置的参考。
这里的问题是这个 tarball 是什么以及如何将它提供给我的 Flink 集群
【问题讨论】:
-
您可以尝试删除标志 --environment_type EXTERNAL --environment_config localhost:50000 吗?
-
所以这是我的原始设置,然后 --environment_type 默认为 DOCKER,当我在 Kubernetes 中运行时,这需要在 docker setup 中使用 docker,我遇到了这个问题 issues.apache.org/jira/browse/BEAM-6020,目前尚未解决