【问题标题】:How to parallelize task in Flask?如何在 Flask 中并行化任务?
【发布时间】:2016-08-19 08:51:12
【问题描述】:

我正在向我的 Flask 服务器发送 XHR 请求,以便在网络上执行多次 ping 操作

资源

def get(self, site_id):
    …
    for printer in printers:
        hostname = printer['hostname']
        response[site_id][hostname] = network_utils.ping(hostname)

    return response

shell.execute下面我用subprocess.check_output来运行一个原生的ping

def ping(hostname):
    command = ['ping', '-c', '1', '-W', '1', '-q', hostname]

    response = shell.execute(command)
    return output_parser.ping(response['results'])

输出

{
    "test-site": {
        "avg": 0.093, "max": 0.093, "mdev": 0.0, "min": 0.093,
        "1.1.1.1": { "avg": null, "max": null, "mdev": null, "min": null},
        "1.2.3.4": { "avg": null, "max": null, "mdev": null, "min": null},
        "127.0.0.1": { "avg": 0.061, "max": 0.061, "mdev": 0.0, "min": 0.061}
    }
}

问题

ping 是按顺序运行的,使得请求超级慢(几十秒,我怎样才能加快速度?

【问题讨论】:

    标签: python flask parallel-processing ping flask-restful


    【解决方案1】:

    听起来最好的选择是线程,因为您的问题是I/O 绑定。我正在使用Semaphore 限制为 5 个线程。

    我将响应 dict 发送到 ping dict 是线程安全的,但如果您考虑更复杂的事情,您应该阅读 this

    def get(self, site_id):
        …
        semaphore = threading.Semaphore(5)
        threads = []
    
        for printer in printers:
            hostname = printer['hostname']
            threads.append(threading.Thread(target=network_utils.ping,
                              args=(semaphore, response, site_id, hostname)))
    
        # Start and wait to all threads to finish
        map(lambda t: t.start(), threads)
        map(lambda t: t.join(), threads)
    
        return response
    
    def ping(semaphore, response, site_id, hostname):
        semaphore.acquire()
    
        command = ['ping', '-c', '1', '-W', '1', '-q', hostname]
        response = shell.execute(command)
        ping_data = output_parser.ping(response['results'])
    
        response[site_id][hostname] = ping_data
    
        semaphore.release()
    

    【讨论】:

    • 我需要acquire()/release()ping() 函数内部还是可以在thread.append() 周围做?
    • 你必须在里面做,当附加到threads时我只是在一个lilst中添加一个线程对象,就在调用t.start()时实际线程正在启​​动
    • @ÉdouardLopez BTW,我不知道printers 的长度,但如果列表很小,您可以删除信号量。但如果它太大——你不想同时打开 5K 线程。
    • 打印机列表应保持在 200 项以下
    【解决方案2】:

    例如,通过gevent 使子进程异步。

    from gevent import subprocess
    import gevent
    
    def ping(hostname):
        command = ['ping', '-c', '1', '-W', '1', '-q', hostname]
        return subprocess.Popen(command, stdout=subprocess.PIPE)
    
    def get(self, site_id):
        …
        # Start all the pings in parallel, asynchronously
        # Use dict to reference host: ping subprocess
        # as results will come in at different times
        pings = {printer['hostname']: ping(printer['hostname']) 
                 for printer in printers}
        # Wait for all of them to complete
        gevent.wait(pings.values())
        for hostname in pings:
            response[site_id][hostname] = output_parser.ping(pings[hostname].stdout.read())
        return response
    

    【讨论】:

    • 为什么要修改ping()函数?
    • check_output 是一个阻塞函数,并且总是将子进程调用顺序化。使用 gevent 使它们异步,需要首先并行启动所有子流程,并在它们全部完成后检查输出。
    • 请注意,异步意味着没有线程。 Gevent 可以轻松处理成百上千的并行子进程,尤其是因为您的子进程是 I/O 绑定的。使用线程,一旦线程数超过可用的物理内核数,性能就会受到影响。
    【解决方案3】:

    Upvote Or Duan answer as mine 是基于他的回答:

    资源

    class Ping(Resource):
        def get(self, site_id):
            site_hostname = mast_utils.list_sites(site_id)['results'][0]['hostname']
            printers = mast_utils.list_printers(site_id)['results']['channels']
    
            response = network_utils.parellelize(network_utils.ping, site_hostname, printers)
            return response
    
    api.add_resource(Ping, '/ping/<string:site_id>/')
    

    network_utils.py

    def ping(hostname):
        command = ['ping', '-q', hostname,
                   '-w', '1',
                   '-W', '1',
                   '-i', '0.2'
                   ]
    
        response = shell.execute(command)
    
        return output_parser.ping(response['results'])
    
    
    def collect(task, response, **kwargs):
        hostname = kwargs['hostname']
    
        response[hostname] = task(**kwargs)
    
    
    def parellelize(task, site_id, printers, **kwargs):
        response = {}
        kw = kwargs.copy()
        kw.update({'hostname': site_id})
        collect(task, response, **kw)
    
        printers_response = {}
        threads = []
        for printer in printers:
            hostname = printer['hostname']
            kw = kwargs.copy()
            kw.update({'hostname': hostname})
    
            threads.append(
                threading.Thread(
                    target=collect,
                    args=(task, printers_response),
                    kwargs=kw
                )
            )
    
        for thread in threads:
            thread.start()
            thread.join()
    
        response[site_id].update(printers_response)
    
        return response
    

    test_network_utils.py

    class NetwrokUtilsTestCase(unittest.TestCase):
        def test_ping_is_null_when_host_unreachable(self):
            hostname = 'unreachable'
    
            response = network_utils.ping(hostname)
    
            self.assertDictEqual(response, {
                'avg': None,
                'max': None,
                'mdev': None,
                'min': None
            })
    
        def test_ping_reply_time_when_reachable(self):
            hostname = '127.0.0.1'
    
            response = network_utils.ping(hostname)
    
            self.assertGreater(response['avg'], 0)
    
        def test_ping_with_only_a_site(self):
            site_hostname = 'localhost'
            printers = []
            response = {}
    
            response = network_utils.parellelize(network_utils.ping, site_hostname, printers)
    
            self.assertGreater(response[site_hostname]['avg'], 0)
    
        def test_ping_with_printers(self):
            site_hostname = 'localhost'
            printers = [
                {'hostname': '127.0.0.1', 'port': 22},
                {'hostname': '0.0.0.0', 'port': 22},
            ]
    
            response = network_utils.parellelize(network_utils.ping, site_hostname, printers)
    
            self.assertGreater(response[site_hostname]['avg'], 0)
            self.assertGreater(response[site_hostname]['127.0.0.1']['avg'], 0)
    

    【讨论】:

      猜你喜欢
      • 2015-08-23
      • 2013-10-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-08-22
      • 1970-01-01
      • 1970-01-01
      • 2014-10-27
      相关资源
      最近更新 更多