【问题标题】:python for large data processing用于大数据处理的python
【发布时间】:2015-11-02 05:16:38
【问题描述】:

我对 python 比较陌生,并且已经能够根据表单上回答的类似问题回答我的大部分问题,但我正处于被困的地步,可以使用一些帮助。

我有一个简单的嵌套 for 循环脚本,可以生成字符串的输出。接下来我需要做的是让每个分组都经过一个模拟,基于字符串也将匹配的数值。

真的,我的问题是如何以最好的方式解决这个问题?我不确定多线程是否会工作,因为字符串是生成的,然后需要进行模拟,一次一组。我正在阅读有关队列的信息,不确定是否可以将它们传递到队列中进行存储,然后按照进入队列的相同顺序进行模拟。

无论我做过什么研究,我都愿意接受任何人就此事提出的任何建议。

谢谢!

编辑:我不是在寻找关于如何进行模拟的答案,而是在计算模拟时存储组合的方法

例子

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        print(D)

【问题讨论】:

  • 代码示例会很好。
  • 你到底打算做什么?真实的代码示例/所需的输出?你在做什么模拟?为什么这与“大数据”有关?
  • 代码是这样的,它找到了三个列表的所有组合,这不是一个学校项目,我不能发布它的确切代码,但是列表非常大。 D 中的值将与其他数值匹配,然后进行模拟谢谢,dan
  • 我不明白。这段代码 sn-p 是什么意思?模拟过程在哪里?你想存储什么?介意修改您的代码以使这些清晰吗?

标签: python multithreading algorithm queue multiprocessing


【解决方案1】:

正如 cmets 中所暗示的,multiprocessing 模块就是您要寻找的。由于全局解释器锁 (GIL),线程不会帮助您,它一次将执行限制为一个 Python 线程。特别是,我会看multiprocessing pools。这些对象为您提供了一个接口,让子流程池与主流程并行为您工作,您可以稍后返回并检查结果。

您的示例 sn-p 可能如下所示:

import multiprocessing

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

pool = multiprocessing.pool() # by default, this will create a number of workers equal to
                 # the number of CPU cores you have available
combination_list = [] # create a list to store the combinations

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        combination_list.append(D) # append this combination to the list

results = pool.map(simulation_function, combination_list)
# simulation_function is the function you're using to actually run your
# simulation - assuming it only takes one parameter: the combination

pool.map 的调用是阻塞的——这意味着一旦你调用它,主进程中的执行将停止,直到所有模拟都完成,但它正在并行运行它们。当它们完成时,无论您的模拟函数返回什么,都将在 results 中可用,其顺序与输入参数在 combination_list 中的顺序相同。

如果您不想等待它们,您也可以在您的池中使用apply_async 并将结果存储起来以供以后查看:

import multiprocessing

X = ["a","b"]
Y = ["c","d","e"]
Z= ["f","g"]

pool = multiprocessing.pool()
result_list = [] # create a list to store the simulation results

for A in itertools.combinations(X,1):
    for B in itertools.combinations(Y,2):
        for C in itertools.combinations(Z, 2):

        D = A + B + C
        result_list.append(pool.apply_async(
                simulation_function,
                args=(D,))) # note the extra comma - args must be a tuple

# do other stuff
# now iterate over result_list to check the results when they're ready

如果你使用这个结构,result_list 将充满multiprocessing.AsyncResult objects,这样你就可以用result.ready() 检查它们是否准备好了,如果准备好了,就用result.get() 检索结果。这种方法将导致在计算组合时立即启动模拟,而不是等到所有组合都计算完毕后再开始处理它们。缺点是管理和检索结果有点复杂。例如,您必须确保结果已准备好或准备好捕获异常,您需要准备好捕获可能在工作函数中引发的异常等。这些警告在文档中得到了很好的解释。

如果计算组合实际上并不需要很长时间,并且您不介意在它们都准备好之前暂停主进程,我建议使用 pool.map 方法。

【讨论】:

  • @djk47463 我看到你对 Stack Overflow 很陌生。欢迎!我只是想让您知道“接受”选项 - 在您的问题答案的投票计数下方有一个空心复选标记。当您提出问题时,您可以通过单击该答案上的复选标记来接受对您最有帮助的答案之一。如果没有一个答案让您的问题令您满意,那么这样做不应该有任何压力,但如果有,那么接受它是一种很好的礼仪。更多详情:stackoverflow.com/help/accepted-answer
猜你喜欢
  • 1970-01-01
  • 2016-11-10
  • 1970-01-01
  • 1970-01-01
  • 2020-06-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多