【发布时间】:2020-06-24 19:59:36
【问题描述】:
我使用了 wordcount 示例的略微修改版本 (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py),将 process 函数替换为以下内容:
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
import random
import time
n = random.randint(0, 1000)
time.sleep(5)
logging.getLogger().warning('PARALLEL START? ' + str(n))
time.sleep(5)
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
time.sleep(5)
logging.getLogger().warning('PARALLEL END? ' + str(n))
time.sleep(5)
return words
这个想法是检查该步骤是否正在并行执行。例如,预期的输出是:
PARALLEL START? 447
PARALLEL START? 994
PARALLEL END? 447
PARALLEL START? 351
PARALLEL START? 723
PARALLEL END? 994
PARALLEL END? 351
PARALLEL END? 723
但是,实际结果是这样的,说明这一步没有并行运行:
PARALLEL START? 447
PARALLEL END? 447
PARALLEL START? 994
PARALLEL END? 994
PARALLEL START? 351
PARALLEL END? 351
PARALLEL START? 723
PARALLEL END? 723
我尝试使用手动设置 Direct_num_workers 的 LocalRunner,以及使用多个工作人员的 DataflowRunner,但无济于事。可以做些什么来确保这一步实际上是并行运行的?
更新:here 发现的多处理模式看起来很有希望。但是,在 Windows 命令行 (python wordcount.py --region us-east1 --setup_file setup.py --input_file gs://dataflow-samples/shakespeare/kinglear.txt --output output/) 上,我在使用它时收到以下错误:
Exception in thread run_worker:
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 926, in _bootstrap_inner
self.run()
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\portability\local_job_service.py", line 218, in run
p = subprocess.Popen(self._worker_command_line, shell=True, env=env_dict)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 775, in __init__
restore_signals, start_new_session)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 1119, in _execute_child
args = list2cmdline(args)
File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 530, in list2cmdline
needquote = (" " in arg) or ("\t" in arg) or not arg
TypeError: argument of type 'int' is not iterable
【问题讨论】:
-
你使用什么输入数据?
-
我保留了 wordcount 的默认输入:storage.googleapis.com/dataflow-samples/shakespeare/… ('gs://dataflow-samples/shakespeare/kinglear.txt')
-
您的 Python Beam SDK 版本是多少?
-
它是 2.19.0,我刚刚更新到 2.22.0。感谢您的提醒。不幸的是,行为仍然相同。
标签: python parallel-processing google-cloud-dataflow apache-beam