【问题标题】:Receive multiple send commands using mpi4py使用 mpi4py 接收多个发送命令
【发布时间】:2016-07-05 15:06:36
【问题描述】:

如何修改以下代码(改编自http://materials.jeremybejarano.com/MPIwithPython/pointToPoint.html),以便root = 0 接收每个comm.Send 实例并打印输出。目前只接收到第一个发送命令。

#passRandomDraw.py
import numpy
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"


else:
    for i in range(0,np.random.randint(1,10),1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0)


if rank == 0:
   comm.Recv(randNum, ANY_SOURCE)
   print "Process", rank, "received the number", randNum[0]

【问题讨论】:

  • 只是为了澄清/确认。我是否假设您没有机会事先知道每个等级将发送多少条消息,即使是在等级本身上也是如此?
  • 理想情况下,我想从每个等级发送消息,而不必知道该等级将发送多少消息。如果这不可行,则可以计算在等级内将发送多少条消息,但对于每个等级来说这将是不同的。

标签: python numpy mpi mpi4py


【解决方案1】:

如果您不知道要发送多少条消息,那么您必须引入一条消息来标记消息的结束。您可以通过使用特殊标签来通用地使用它。为避免为终止消息提供不匹配的缓冲区,您可以使用probe 检查传入的消息类型

tag_data = 42
tag_end = 23

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"
else:
    for i in range(0,np.random.randint(1,10),1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0, tag=tag_data)
    # send the termination message. Using the lower-case interface is simpler
    comm.send(None, dest=0, tag=tag_end)

if rank == 0:
    # For debugging it might be better to use a list of still active procsses
    remaining = comm.Get_size() - 1
    while remaining > 0:
        s = MPI.Status()
        comm.Probe(status=s)
        # make sure we post the right kind of message
        if s.tag == tag_data:
            comm.Recv(randNum, s.source, tag=tag_data)
            print "Process ", s.source, " received the number", randNum[0]
        elif s.tag == tag_end:
            # don't need the result here
            print "Process ", rank, " is done"
            comm.recv(source=s.source, tag=tag_end)
            remaining -= 1

这有很多变体。例如,如果您知道一条消息是最后一条消息,则可以合并终止消息。

【讨论】:

  • 目前这对我来说在comm.probe(status=s) 行失败,错误为comm.probe(status=s) AttributeError: 'mpi4py.MPI.Intracomm' object has no attribute 'probe'。此外,您选择的标签编号是否有重要意义,或者是否只需将tag_endtag_data 分开识别,任何数字都可以?还是为特定进程保留某些数字?
  • 也许 probe 是在 mpi4py 的 2.0.0 版本中引入的。只需用大写的Probe 替换,应该没有区别。标签值是完全任意的,它们只需要不同。我的示例中的实际值是对4223 的引用,尽管我实际上没有看到后者。
  • 看来comm.probe 确实只出现在 mpi4py 2.0.0 版中。我有一个python3 安装使用这个和你发布的原始代码(显然修改了print 语句)。但是,使用旧版本的 mpi4py comm.Probe 会导致进程挂起。同样在print "Process ", rank, " is done" 行中,它总是打印rank=0。大概用s.source替换rank会打印刚刚终止并发送tag_endrank
  • 不知道为什么它会挂在旧版本中?你知道它挂在哪里,或者你可以使用当前版本的mpi4py?是的,rank 应该是 s.source。我在答案中修复了它。
【解决方案2】:

如果每个进程都知道要发送的消息数量,则可以设计以下步骤来解决问题:

1) 减少发送到根进程的消息数量。每个进程向根发送它稍后将发送的消息数量。该操作称为归约,可以通过函数comm.reduce(...)来执行

2) 接收进程 0 上的所有消息。

这是基于您的代码,应该可以解决问题。可以mpirun -np 4 python main.py运行

#passRandomDraw.py
import numpy
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

#just in case, if numpy.random is seed with 
np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)

if rank == 0:
    randNum = numpy.zeros(1)
    print "Process before receiving random numbers"
    nb=np.empty((1,),dtype=int)
    nb0=np.zeros((1,),dtype=int)
    comm.Reduce([nb0, MPI.INT],[nb, MPI.INT],op=MPI.SUM, root=0)  #sums the total number of random number from every process on rank 0, in nb.
    #print "rank"+str(rank)+" nb "+str(nb)
else:
    nb=np.empty((1,),dtype=int)
    nb[0]=np.random.randint(1,10)
    #print "rank"+str(rank)+" nb "+str(nb)
    comm.Reduce([nb, MPI.INT],None,op=MPI.SUM, root=0)
    for i in range(0,nb[0],1):
        randNum = numpy.zeros(1)
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "iteration", i, "drew the number", randNum[0]
        comm.Send(randNum, dest=0)



if rank == 0:
   for i in range(nb[0]): #receives nb message, each one with its int.
       comm.Recv(randNum, ANY_SOURCE)
       print "Process", rank, "received the number", randNum[0]

根据documentation of numpy.random(),Mersenne Twister 伪随机数生成器最初由从/dev/urandom(或 Windows 模拟)中提取的数字(如果可用)或时钟种子。因此,在最后一种情况下,所有进程都可以接收相同的种子并生成相同的随机数。为了防止这种情况发生,我添加了以下行:

np.random.seed(np.random.randint(np.iinfo(np.uint32).min,np.iinfo(np.uint32).max)+rank)

【讨论】:

    猜你喜欢
    • 2021-11-14
    • 2014-02-01
    • 1970-01-01
    • 2020-10-24
    • 2011-01-10
    • 2011-01-28
    • 2012-09-20
    • 1970-01-01
    • 2015-12-14
    相关资源
    最近更新 更多