一,concurent.furtrue进程池和线程池
1.1 concurent.furtrue 开启进程,多进程&线程,多线程
1 # concurrent.futures创建并行的任务 2 # 进程池 ProcessPoolExecutor,ThreadPoolExecutor 3 # 下面例子是Io密集型,所以时间上比叫多 4 from concurrent.futures import ProcessPoolExecutor 5 import os,time,random 6 def task(n): 7 print('%s is running' %os.getpid()) 8 time.sleep(2) 9 return n**2 10 11 # if __name__ == '__main__': 12 # p=ProcessPoolExecutor(max_workers=4) #进程池 max_workers最大的工作任务 13 # l=[] 14 # start=time.time() 15 # for i in range(10): 16 # obj=p.submit(task,i) #submit 异步提交,后面加入.result就变成同步了 17 # l.append(obj) 18 # p.shutdown() #shutdown(wait=True) == p.close + p.join 19 # print('='*30) 20 # print([obj for obj in l]) #结果内存地址 21 # print([obj.result() for obj in l]) 22 # print(time.time()-start) 23 24 25 #线程池,用线程来探测上面的例子,时间少了很多 26 from concurrent.futures import ThreadPoolExecutor 27 import threading 28 import os,time 29 def task(n): 30 print('%s:%s is running' %(threading.currentThread().getName(),os.getpid())) 31 time.sleep(2) 32 return n**2 33 34 # if __name__ == '__main__': 35 # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5 36 # l=[] 37 # start=time.time() 38 # for i in range(10): 39 # obj=p.submit(task,i).result() 40 # l.append(obj) 41 # p.shutdown() 42 # print('='*30) 43 # print([obj.result() for obj in l]) 44 # print(time.time()-start) 45 46 47 #p.submit(task,i).result()即同步执行,进程池,线程池都有 48 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 49 import os,time,random 50 def task(n): 51 print('%s is running' %os.getpid()) 52 time.sleep(2) 53 return n**2 54 55 # if __name__ == '__main__': 56 # p=ProcessPoolExecutor() 57 # start=time.time() 58 # for i in range(10): 59 # res=p.submit(task,i).result() #同步执行 60 # print(res) 61 # print('='*30) 62 # print(time.time()-start)
1.2 concurrent.futures map方法
1 # concurrent的方法map,前面是一个函数,后面是一个可迭代对象 2 from concurrent.futures import ProcessPoolExecutor 3 import os,time,random 4 def task(n): 5 print('%s is running' %os.getpid()) 6 time.sleep(2) 7 return n**2 8 9 if __name__ == '__main__': 10 p=ProcessPoolExecutor() 11 obj=p.map(task,range(10)) 12 p.shutdown() 13 print('='*30) 14 print(list(obj)) 15 16 17 # 上面的例子改成map 18 # if __name__ == '__main__': 19 # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5 20 # l=[] 21 # for i in range(10): 22 # obj=p.submit(task,i).result() 23 # l.append(obj) 24 # p.shutdown() 25 # print([obj.result() for obj in l]) 26 27 28 # if __name__ == '__main__': 29 # p=ThreadPoolExecutor() #线程池 max_workers=20 一个进程多少线程,默认线程cpu的个数乘以5 30 # obj = p.map(task,range(10)) 就可以了 31 # p.shutdown() 32 # print(list(obj)) 33 34 # *** 没法得到其中一个,因为是迭代器,而且这个东西没有回调函数,只是提交任务的话Map最好
1.3 concurrent.futures 爬网页例子
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 import requests 3 import os 4 import time 5 from threading import currentThread 6 def get_page(url): 7 print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url)) 8 response=requests.get(url) 9 time.sleep(2) 10 return {'url':url,'text':response.text} 11 def parse_page(res): 12 res=res.result() #参数传过来是对象,所以需要得到结果 13 print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url'])) 14 with open('db.txt','a') as f: 15 parse_res='url:%s size:%s\n' %(res['url'],len(res['text'])) 16 f.write(parse_res) 17 if __name__ == '__main__': 18 # p=ProcessPoolExecutor() 19 p=ThreadPoolExecutor() 20 urls = [ 21 'https://www.baidu.com', 22 'https://www.baidu.com', 23 'https://www.baidu.com', 24 'https://www.baidu.com', 25 'https://www.baidu.com', 26 'https://www.baidu.com', 27 ] 28 29 for url in urls: 30 # multiprocessing.pool_obj方式:p.apply_async(get_page,args=(url,),callback=parse_page),这是自己调用get得到结果给回掉函数 31 p.submit(get_page, url).add_done_callback(parse_page) #回掉函数,前面的结果就是参数,这个参数是对象,所以需要get 32 # p.map(get_page,urls) map写法 33 p.shutdown() 34 print('主',os.getpid())
二,协程
一 引子
本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长
ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态
其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:
#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级 #2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
#串行执行
import time
def consumer(res):
'''任务1:接收数据,处理数据'''
pass
def producer():
'''任务2:生产数据'''
res=[]
for i in range(10000000):
res.append(i)
return res
start=time.time()
#串行执行
res=producer()
consumer(res)
stop=time.time()
print(stop-start) #1.5536692142486572
#基于yield并发执行
import time
def consumer():
'''任务1:接收数据,处理数据'''
while True:
x=yield
def producer():
'''任务2:生产数据'''
g=consumer()
next(g)
for i in range(10000000):
g.send(i)
start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()
stop=time.time()
print(stop-start) #2.0272178649902344