概念
多线程和多进程的区别:
cpu全局解释器锁(GIL):
多线程、多进程如何选择:
IO密集型应用:大量的IO操作,调度一颗cpu足以,采用多线程。
主线程:
程序从上往下执行,解释器的执行过程,叫主线程。
多线程
一个简单的代码:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import threadingimport timelock=threading.Lock() #加锁防争抢屏幕输出def run(num): lock.acquire()print "thread......" ,num lock.release() time.sleep(1)for i in range(10): t = threading.Thread(target=run,args=(i,)) #实例化,i后面必须有逗号,每创建一个线程,执行run函数。相当于把i当做num传给run(). t.start() #执行start方法 |
更多方法:
| start | 线程准备就绪,等待CPU调度 | |
| setName | 为线程设置名称 | |
| getName | 获取线程名称 | |
| setDaemon |
设置为前台线程(默认)或后台线程 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止 |
t=thread.Thread...... t.setDeamon(Ture or False) t.start() |
| join | 逐个执行每个线程,执行完毕后继续往下执行...就是先把其它前台线程都执行完了才去执行主线程 | t.start() t.join() #执行到这儿后返回执行其它线程,可以有参数,比如加个2,就是等2s,2s上面的线程没执行完,就往下执行主线程了。 |
| run | 线程被cpu调度后执行此方法 |
线程锁:
如果有一个变量,每个线程获取它后都+=1,那么这时就会出现抢占,而且每个线程拿到的变量数据不一样,有可能是计算之前的,也有可能是计算之后的,最终影响了最后的计算结果。
未加线程锁的代码:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import threading
import time
gl_num = 0
def show(arg):
global gl_num #声明为全局变量
time.sleep(1)
gl_num +=1 #1、gl_num=0+1
print gl_num
for i in range(10): #创建10个线程
t = threading.Thread(target=show, args=(i,))
t.start()
print '主线程已执行完毕。'
|
运行结果:
|
1
2
3
4
|
1234567899 |
可以看到,数字都是随机的,并没有得到预想的结果。
加入线程锁:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import threading
import time
gl_num = 0
lock = threading.RLock() #RLock是递归,Lock是非递归
def Func():
lock.acquire() #当某一个线程拿到这个变量就锁上了,别人不能碰
global gl_num
gl_num +=1 #第一个线程:gl_num=0+1,第二个:gl_num=1+1,第三个:。。。。。。
time.sleep(1)
print gl_num
lock.release() #计算完成,释放锁,下一个线程可以去拿了
for i in range(10):
t = threading.Thread(target=Func)
t.start()
|
运行结果:
|
1
2
3
4
|
主线程已执行完毕。123 |
加入了线程锁的程序结果正确了,注意结果的打印顺序,setDaemon是默认的True,所以主线程执行完毕后,等待前台线程也执行完成后,程序停止。
event:
setDaemon的作用主要是主线程是否等待其他线程,而这个event(python线程的事件)就可以控制其它的线程,通过设定标志位来控制子线程何时执行,它(通过三个方法:set、wait、clear)能让子线程停下来,也能够让子线程继续。
三个方法,一个字段
三个方法:set、wait、clear
事件处理的机制:
在event内部定义了一个标志位“Flag”,这个“Flag”你看不见,当:
-
如果“Flag”值为 False(执行
event_obj.clear()设定),那么当程序执行 event.wait( )时就会阻塞; -
如果“Flag”值为True(执行
event_obj.set()设定),那么event.wait 方法时便不再阻塞。
代码:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import threading
def do(event):
print 'start'
event.wait()
print 'execute'
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear()inp = raw_input('input:')
if inp == 'true':
event_obj.set()
|
使用event,要先实例化,创建event_obj,然后才能执行它里面的三个方法set、wait、clear
clear( ):将“Flag”设置为False
set( ):将“Flag”设置为True
执行结果:
|
1
2
3
4
5
6
7
8
9
|
startstartstart…… input:true
executeexecuteexecute…… |
开始,主线程先把标志位设定为False,每个子线程执行了打印“start”任务,然后因为event.wait()就阻塞住了,直到input:true,标志位设定成了True,下面的打印“execute”任务才会被执行。
|
1
2
3
4
5
6
7
8
|
from multiprocessing import Process
def foo(i):
print 'say hi',i
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
|
和创建多线程代码类似,只不过导入的模块不一样而已。
创建和cpu-cores相等的进程,能最大化利用cpu,也是最合理的数量。
多进程因为内存不能共享,所以创建进程需要非常大的系统资源开销。数据怎么才能共享呢?
进程间数据共享:
1、用一个特殊的数据结构,Array数组,声明后,其它进程能使用了。
|
1
2
3
4
5
6
7
8
9
10
11
12
|
#方法一,公共数组Arrayfrom multiprocessing import Process,Array
temp = Array('i', [11,22,33,44])
def Foo(i):
temp[i] = 100+i
for item in temp:
print i,'----->',item
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
|
2、公共数据字典manage.dict()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#方法二:manage.dict()共享数据from multiprocessing import Process,Manager
manage = Manager()
dic = manage.dict()
def Foo(i):
dic[i] = 100+i
print dic.values()
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()
|
既然进程之间数据能够共享,那么势必会造成争抢,形成脏数据,so,和进程一样,要加一个锁。
进程锁:
进程锁的使用方法和线程锁一样,只不过调用的方法不同而已。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from multiprocessing import Process, Array, RLock
def Foo(lock,temp,i):
"""
将第0个数加100
"""
lock.acquire()
temp[0] = 100+i
for item in temp:
print i,'----->',item
lock.release()
lock = RLock()
temp = Array('i', [11, 22, 33, 44])
for i in range(20):
p = Process(target=Foo,args=(lock,temp,i,))
p.start()
|
进程池:
Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
创建进程池代码:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
from multiprocessing import Process,Pool
import time
def Foo(i):
time.sleep(2)
return i+100
def Bar(arg): #arg是Foo函数的返回值
print arg
pool = Pool(5) #创建5个池进程
#Pool类怎么来的?from multiprocessing导入的时候,加载了__init__.py中的Pool函数,这个函数又把pool.py中的class Pool加载进内存print pool.apply(Foo,(1,)) #同步模式,一个一个执行
print pool.apply_async(func =Foo, args=(1,)).get() #异步模式,同时执行所有进程
for i in range(10): #进程池只有5个进程连接,所以会5个5个执行
pool.apply_async(func=Foo, args=(i,),callback=Bar) #执行完func再执行callback
print 'end'
pool.close() #不再接受新的请求
#pool.terminate() #立即关闭,不管执行完没有,如果写了这句,后面的join就没有意义了。pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
|
执行结果:
注意:在windows上执行要加入 if __name__=="__main__": ,否则会报错。
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
[root@localhost ]# python process_pool.py
101101end100101102103104105106107108109 |
函数解释:
-
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的;
-
close() 关闭pool,使其不在接受新的任务。
-
terminate() 结束工作进程,不再处理未完成的任务。
-
join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
部分来源:http://www.cnblogs.com/kaituorensheng/p/4465768.html
协程
相关概念:
定义:
对线程的一个分片
与进程、线程的区别:
线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。
存在的意义:
对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(上下文切换开销,保存状态,下次继续)。
协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序,效率更高。
适用场景:
当程序中存在大量不需要CPU的操作时(IO),适用于协程;如,网络爬虫。
协程代码:greenlet(需要安装,用处不大。)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# 下载地址:https://pypi.python.org/packages/2.7/g/greenlet/greenlet-0.4.9-py2.7-win-amd64.egg#md5=3449dcbdee3d387d1f065f1fd2f6eb88# 参考《python安装.egg文件》安装from greenlet import greenlet
def test1():
print 1
gr2.switch() #切换到执行test2函数,并记录执行到这里的标记,下次再switch回来的时候,从这里接着执行。
print 2
gr2.switch()
def test2():
print 3
gr1.switch()
print 4
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch() #和yield相似,swithch()的时候先暂停,执行下一个。 |
执行结果:
|
1
2
3
4
|
1324 |
greenlet是主动切换,当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
由于IO操作非常耗时,经常使程序处于等待状态,第三方的gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
genvent:
genvent是对greenlet的一个封装,同时发多个IO请求,不阻塞。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from gevent import monkey; monkey.patch_all()
import gevent
import urllib2
def f(url):
print('GET: %s' % url)
resp = urllib2.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([]) |
运行结果:
|
1
2
3
4
5
6
|
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
45661 bytes received from https://www.python.org/.
14823 bytes received from https://github.com/.
304034 bytes received from https://www.yahoo.com/.
|
从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。