【问题标题】:Python multiprocessing Pool.apply_async with shared variables (Value)Python 多处理 Pool.apply_async 与共享变量(值)
【发布时间】:2015-06-08 10:26:58
【问题描述】:

对于我的大学项目,我正在尝试开发一个基于 python 的流量生成器。我在 vmware 上创建了 2 台 CentOS 机器,我使用 1 作为我的客户端和 1 作为我的服务器机器。我已经使用IP aliasing 技术来增加仅使用单个客户端/服务器机器的客户端和服务器的数量。到目前为止,我已经在我的客户端机器上创建了 50 个 IP 别名,在我的服务器机器上创建了 10 个 IP 别名。我还使用多处理模块同时生成从所有 50 个客户端到所有 10 个服务器的流量。我还在我的服务器上开发了一些配置文件(1kb、10kb、50kb、100kb、500kb、1mb)(在 /var/www/html 目录中,因为我使用的是 Apache 服务器)并且我正在使用 urllib2 向这些配置文件发送请求我的客户端机器。我使用httplib+urllib2首先绑定到任何一个源别名ip,然后使用urllib2从这个ip发送请求。这里到increase my number of TCP Connections,我正在尝试使用 multiprocessing.Pool.apply_async 模块。但是我在运行我的脚本时收到此错误“RuntimeError:同步对象只能通过继承在进程之间共享”。经过一番调试,我发现这个错误是由于使用了multiprocessing.Value引起的。但是我想在我的进程之间共享一些变量,并且我还想增加我的 TCP 连接数。在这里可以使用哪些其他模块(除了 multiprocessing.Value)来共享一些公共变量?或者这个查询还有其他解决方案吗?

'''
Traffic Generator Script:

 Here I have used IP Aliasing to create multiple clients on single vm machine. 
 Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers
'''
import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script
m=multiprocessing.Manager()
response_time=m.list()    #some shared variables
error_count=multiprocessing.Value('i',0)
def send_request3():    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4():    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3)
        pool.apply_async(send_request4)
        pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return
start=float(time.time())
func()
end=float(time.time())-start
print end

【问题讨论】:

  • response_time ,是你的共享变量,不是吗?
  • 使用multiprocessing.Queue,而不是一个全局变量IMO。
  • 如果我不使用共享变量 response_time 和 error_count,Pool.apply_async 可以完美运行。
  • @pnv yes response_time 和 error_count 是我的共享变量。
  • @pnv 我从未使用过 multiprocessing.Queue。你能指导我如何在这里使用 multiprocessing.Queue 吗?

标签: python linux pool python-multiprocessing


【解决方案1】:

正如错误消息所述,您不能通过 pickle 传递 multiprocessing.Value。但是,您可以使用multiprocessing.Manager().Value

import multiprocessing
import urllib2
import random
import myurllist    #list of all destination urls for all 10 servers
import time
import socbindtry   #script that binds various virtual/aliased client ips to the script

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        with error_count.get_lock():
            error_count.value += 1

#50 such functions are defined here for 50 clients

def func(response_time, error_count):
    pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count())
    args = (response_time, error_count)
    for i in range(5):
        pool.apply_async(send_request3, args=args)
        pool.apply_async(send_request4, args=args)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return

if __name__ == "__main__":
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=m.Value('i',0)

    start=float(time.time())
    func(response_time, error_count)
    end=float(time.time())-start
    print end

这里还有一些注意事项:

  1. 对 750 个进程使用 Pool 不是一个好主意。除非您使用具有数百个 CPU 内核的服务器,否则这将使您的机器不堪重负。使用更少的流程会更快,并且对您的机器施加的压力更小。更像2 * multiprocessing.cpu_count()
  2. 作为最佳实践,您应该将需要使用的所有共享参数显式传递给子进程,而不是使用全局变量。这增加了代码在 Windows 上运行的机会。
  3. 看起来您的所有send_request* 函数都做了几乎完全相同的事情。为什么不只创建一个函数并使用一个变量来决定使用哪个socbindtry.BindableHTTPHandler?这样做可以避免代码重复。
  4. 递增error_count 的方式不是进程/线程安全的,并且容易受到竞争条件的影响。您需要使用锁来保护增量(就像我在上面的示例代码中所做的那样)。

【讨论】:

  • @dano,您需要锁定经理管理的价值吗?经理不负责访问价值吗?另见bugs.python.org/issue35786
  • @Avraham 是的,我相信你会的。经理管理的 Value 只是常规 Value 的代理,所以我认为没有任何方法可以使 += 操作原子化,因为它会导致对代理的两次单独调用(一次获取当前值,然后另一个将它设置为新的值)。很好地了解get_lock() 问题。我认为您必须改用经理管理的Lock()
  • @dano,FWIW,在我正在使用的代码中,将管理器值传递给 15 个文件上名为 asyncronoulsy 的池,使用普通托管值可以在不使用管理器锁的情况下正常工作,但无法执行时前面有一个带锁。我正在调查 value 是否会比我用作计数器的队列更快,答案似乎是否定的,正如你所说,因为我使用的是 Pool,everyting 被腌制并且经理处理对队列的访问和两者都值。谢谢!
【解决方案2】:

可能,因为 Python Multiprocess diff between Windows and Linux (说真的,我不知道虚拟机中的多处理是如何工作的,就像这里的情况一样。)

这可能有效;

import multiprocessing
import random
import myurllist    #list of all destination urls for all 10 servers
import time

def send_request3(response_time, error_count):    #function to send requests from alias client ip 1
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3)    #bind to alias client ip1
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
def send_request4(response_time, error_count):    #function to send requests from alias client ip 2
    opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4)    #bind to alias client ip2
    try:
        tstart=time.time()
        for i in range(myurllist.url):
            x=random.choice(myurllist.url[i])
            opener.open(x).read()
            print "file downloaded:",x
            response_time.append(time.time()-tstart)
    except urllib2.URLError, e:
        error_count.value=error_count.value+1
#50 such functions are defined here for 50 clients
def func():
    m=multiprocessing.Manager()
    response_time=m.list()    #some shared variables
    error_count=multiprocessing.Value('i',0)

    pool=multiprocessing.Pool(processes=750)
    for i in range(5):
        pool.apply_async(send_request3, [response_time, error_count])
        pool.apply_async(send_request4, [response_time, error_count])
        # pool.apply_async(send_request5)
#append 50 functions here
    pool.close()
    pool.join()
    print"All work Done..!!"
    return


start=float(time.time())
func()
end=float(time.time())-start
print end

【讨论】:

  • 不,这对我不起作用.. 无论如何感谢您的帮助
  • 是否在VM中无关紧要,重要的是在VM中运行的操作系统。这就是虚拟机的全部意义——其中运行的任何软件都不需要知道它不是真正的硬件。虚拟机正在运行 CentOS,因此所有正常的 Linux multiprocessing 行为都适用。
猜你喜欢
  • 2020-06-12
  • 1970-01-01
  • 2013-06-26
  • 1970-01-01
  • 2019-12-11
  • 1970-01-01
  • 2016-07-21
  • 2016-11-14
  • 2017-06-18
相关资源
最近更新 更多