一,concurent.furtrue进程池和线程池

1.1 concurent.furtrue 开启进程,多进程&线程,多线程

 1 # concurrent.futures创建并行的任务
 2 # 进程池 ProcessPoolExecutor,ThreadPoolExecutor
 3 # 下面例子是Io密集型,所以时间上比叫多
 4 from concurrent.futures import ProcessPoolExecutor
 5 import os,time,random
 6 def task(n):
 7     print('%s is running' %os.getpid())
 8     time.sleep(2)
 9     return n**2
10 
11 # if __name__ == '__main__':
12 #     p=ProcessPoolExecutor(max_workers=4)    #进程池   max_workers最大的工作任务
13 #     l=[]
14 #     start=time.time()
15 #     for i in range(10):
16 #         obj=p.submit(task,i)    #submit 异步提交,后面加入.result就变成同步了
17 #         l.append(obj)
18 #     p.shutdown()         #shutdown(wait=True) == p.close + p.join
19 #     print('='*30)
20 #     print([obj for obj in l])   #结果内存地址
21 #     print([obj.result() for obj in l])
22 #     print(time.time()-start)
23 
24 
25 #线程池,用线程来探测上面的例子,时间少了很多
26 from concurrent.futures import ThreadPoolExecutor
27 import threading
28 import os,time
29 def task(n):
30     print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
31     time.sleep(2)
32     return n**2
33 
34 # if __name__ == '__main__':
35 #     p=ThreadPoolExecutor()  #线程池  max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5
36 #     l=[]
37 #     start=time.time()
38 #     for i in range(10):
39 #         obj=p.submit(task,i).result()
40 #         l.append(obj)
41 #     p.shutdown()
42 #     print('='*30)
43 #     print([obj.result() for obj in l])
44 #     print(time.time()-start)
45 
46 
47 #p.submit(task,i).result()即同步执行,进程池,线程池都有
48 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
49 import os,time,random
50 def task(n):
51     print('%s is running' %os.getpid())
52     time.sleep(2)
53     return n**2
54 
55 # if __name__ == '__main__':
56 #     p=ProcessPoolExecutor()
57 #     start=time.time()
58 #     for i in range(10):
59 #         res=p.submit(task,i).result()   #同步执行
60 #         print(res)
61 #     print('='*30)
62 #     print(time.time()-start)

 1.2 concurrent.futures map方法

 1 # concurrent的方法map,前面是一个函数,后面是一个可迭代对象
 2 from concurrent.futures import ProcessPoolExecutor
 3 import os,time,random
 4 def task(n):
 5     print('%s is running' %os.getpid())
 6     time.sleep(2)
 7     return n**2
 8 
 9 if __name__ == '__main__':
10     p=ProcessPoolExecutor()
11     obj=p.map(task,range(10))
12     p.shutdown()
13     print('='*30)
14     print(list(obj))
15 
16 
17 # 上面的例子改成map
18 # if __name__ == '__main__':
19 #     p=ThreadPoolExecutor()  #线程池  max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5
20 #     l=[]
21 #     for i in range(10):
22 #         obj=p.submit(task,i).result()
23 #         l.append(obj)
24 #     p.shutdown()
25 #     print([obj.result() for obj in l])
26 
27 
28 # if __name__ == '__main__':
29 #     p=ThreadPoolExecutor()  #线程池  max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5
30 #     obj = p.map(task,range(10))   就可以了
31 #     p.shutdown()
32 #     print(list(obj))
33 
34 # *** 没法得到其中一个,因为是迭代器,而且这个东西没有回调函数,只是提交任务的话Map最好

 

1.3 concurrent.futures 爬网页例子

 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 import requests
 3 import os
 4 import time
 5 from threading import currentThread
 6 def get_page(url):
 7     print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
 8     response=requests.get(url)
 9     time.sleep(2)
10     return {'url':url,'text':response.text}
11 def parse_page(res):
12     res=res.result()   #参数传过来是对象,所以需要得到结果
13     print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
14     with open('db.txt','a') as f:
15         parse_res='url:%s size:%s\n' %(res['url'],len(res['text']))
16         f.write(parse_res)
17 if __name__ == '__main__':
18     # p=ProcessPoolExecutor()
19     p=ThreadPoolExecutor()
20     urls = [
21         'https://www.baidu.com',
22         'https://www.baidu.com',
23         'https://www.baidu.com',
24         'https://www.baidu.com',
25         'https://www.baidu.com',
26         'https://www.baidu.com',
27     ]
28 
29     for url in urls:
30         # multiprocessing.pool_obj方式:p.apply_async(get_page,args=(url,),callback=parse_page),这是自己调用get得到结果给回掉函数
31         p.submit(get_page, url).add_done_callback(parse_page)   #回掉函数,前面的结果就是参数,这个参数是对象,所以需要get
32     # p.map(get_page,urls)  map写法
33     p.shutdown()
34     print('',os.getpid())

 

二,协程

一 引子

    本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

    cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长

python开发concurent.furtrue模块:concurent.furtrue的多进程与多线程&协程

ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态 

    其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
#2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换  
#串行执行
import time
def consumer(res):
    '''任务1:接收数据,处理数据'''
    pass

def producer():
    '''任务2:生产数据'''
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
#串行执行
res=producer()
consumer(res)
stop=time.time()
print(stop-start) #1.5536692142486572



#基于yield并发执行
import time
def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
        x=yield

def producer():
    '''任务2:生产数据'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344
单纯地切换反而会降低运行效率

相关文章:

  • 2021-04-21
  • 2021-08-02
  • 2021-12-23
  • 2021-12-10
  • 2022-02-10
  • 2021-09-27
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-01-25
  • 2021-11-20
  • 2021-05-16
相关资源
相似解决方案