【发布时间】:2021-07-26 11:50:24
【问题描述】:
LE:我想在 macOS 上执行此操作。
我正在尝试在单独的 Flink 集群上运行 Apache Beam,如下所述:https://beam.apache.org/documentation/runners/flink/#executing-a-beam-pipeline-on-a-flink-cluster
(1) 启动一个暴露 Rest 接口的 Flink 集群 (例如,默认情况下为 localhost:8081)。
Flink 1.13.1 在 localhost:8081 上启动成功。
(2) 使用 Flink Rest 端点启动 JobService:
docker run --net=host apache/beam_flink1.10_job_server:latest --flink-master=localhost:8081。
我正在运行它,它再次成功启动:
docker run --net=host apache/beam_flink1.13_job_server:latest --flink-master=localhost:8081
(3) 如上提交管道。
我正在运行 WordCount 管道:
python -m apache_beam.examples.wordcount --input /Users/stefan/datastore/input.txt
--output /Users/stefan/datastore/output.txt
--runner=PortableRunner --job_endpoint=localhost:8099
1 分钟后,管道崩溃并显示 grpc.FutureTimeoutError。这是完整的输出:
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.31.0
INFO:root:No image given, using default Python SDK image
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.31.0
INFO:root:Python SDK container image set to "apache/beam_python3.8_sdk:2.31.0" for Docker environment
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x137d441f0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x137d44280> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x137d449d0> ====================
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.11/Frameworks/Python.framework/Versions/3.8/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/Cellar/python@3.8/3.8.11/Frameworks/Python.framework/Versions/3.8/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/examples/wordcount.py", line 94, in <module>
run()
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/examples/wordcount.py", line 89, in run
output | 'Write' >> WriteToText(known_args.output)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/pipeline.py", line 585, in __exit__
self.result = self.run()
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/pipeline.py", line 564, in run
return self.runner.run_pipeline(self, self._options)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 438, in run_pipeline
job_service_handle = self.create_job_service(options)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 317, in create_job_service
return self.create_job_service_handle(server.start(), options)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/apache_beam/runners/portability/job_server.py", line 54, in start
grpc.channel_ready_future(channel).result(timeout=self._timeout)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/grpc/_utilities.py", line 140, in result
self._block(timeout)
File "/Users/stefan/workspace/biodiversity/.env/lib/python3.8/site-packages/grpc/_utilities.py", line 86, in _block
raise grpc.FutureTimeoutError()
grpc.FutureTimeoutError
知道发生了什么吗?谢谢你的帮助。我还在这里打开了一个关于这个问题的错误:https://issues.apache.org/jira/browse/BEAM-12657
【问题讨论】: