os.fork()
復(fù)制當(dāng)前進(jìn)程狀態(tài)作為子進(jìn)程。復(fù)制時子進(jìn)程返回0,父進(jìn)程返回子進(jìn)程的pid. 子進(jìn)程可通過os.getppid()
獲取父進(jìn)程的pid.同時os.getpid()
可獲得當(dāng)前進(jìn)程的pid.
import os
print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
結(jié)果:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
windows沒有fork()
.可以通過python提供的通用多進(jìn)程模塊multiprocessing
創(chuàng)建多進(jìn)程.
創(chuàng)建多進(jìn)程需要導(dǎo)入Process
模塊:
from multiprocess import Process
使用
p = Process(target=function, args=(parament,...)
創(chuàng)建子進(jìn)程實例.其中target=
傳入子進(jìn)程需執(zhí)行的函數(shù)本身function
,args傳入函數(shù)需要的參數(shù).參數(shù)數(shù)量不固定.
之后使用
p.start()
運行實例.要等待該子進(jìn)程運行結(jié)束再運行之后的代碼可以使用:
p.join()
以下是一個例子:
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
結(jié)果:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.
對于需啟動大量子進(jìn)程的情況,可使用Pool
模塊:
from multiprocessing import Pool
使用:
p = Pool(number)
創(chuàng)建進(jìn)程池.其中number為進(jìn)程池包含子進(jìn)程數(shù)量.不寫默認(rèn)為CPU核數(shù).
使用:
p.apply_async(function, args=(parament,...)
運行子進(jìn)程.
之后需關(guān)閉進(jìn)程池:
p.close()
同時,需等待所有子進(jìn)程運行結(jié)束可使用:
p.join()
以下是一個例子:
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, (end - start))
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Pool()
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'
結(jié)果:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
進(jìn)程間通訊
不同進(jìn)程間可以通過Queue
,Pipe
來通信.Pipe
用于兩個進(jìn)程間通信,Quene
用于多個進(jìn)程間通信.在只有兩個進(jìn)程通信的情況下Pipe
效率高于Queue
.
Pipe
導(dǎo)入Pipe
模塊:
from multiprocessing import Pipe
創(chuàng)建Pipe通信的兩端(返回一個雙元素的list):
p = Pipe(duplex=False)
其中duplex=False
表示該Pipe只
能單向通信.默認(rèn)不寫該參數(shù)為雙向通信.
p[0]
,p[1]
可以分別作為兩個子進(jìn)程的參數(shù)傳遞給子進(jìn)程函數(shù).也可以只傳遞一端給子進(jìn)程,另一端交給父進(jìn)程.
Pipe
的兩端可通過p.send()
傳送值,p.recv()
接收值.
例子1:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
例子2:
import multiprocessing as mul
def proc1(pipe):
pipe.send('hello')
print('proc1 rec:',pipe.recv())
def proc2(pipe):
print('proc2 rec:',pipe.recv())
pipe.send('hello, too')
# Build a pipe
pipe = mul.Pipe()
# Pass an end of the pipe to process 1
p1 = mul.Process(target=proc1, args=(pipe[0],))
# Pass the other end of the pipe to process 2
p2 = mul.Process(target=proc2, args=(pipe[1],))
p1.start()
p2.start()
p1.join()
p2.join()
Queue
導(dǎo)入Queue
模塊:
from multiprocessing import Queue
創(chuàng)建Queue
對象:
q = Queue(max)
其中max表示對象中可以存放的最大數(shù)量.
q可作為全局變量使用,也可以作為參數(shù)傳遞給子進(jìn)程.
使用q.put()
在Queue
對象中放入需傳遞的值,q.get()
取出值.
例子1:
from multiprocessing import Process,Queue
def writer_proc():
q.put(100)
def reader_proc():
print q.get()
if __name__ == '__main__':
q = Queue()
reader = Process(target=reader_proc,args=(q,))
reader.start()
writer = Process(target=writer_proc,args=(q,))
writer.start()
reader.join()
writer.join()
例子2:
import multiprocessing
q = multiprocessing.Queue()
def reader_proc():
print q.get()
reader = multiprocessing.Process(target=reader_proc)
reader.start()
q.put(100)
reader.join()
多線程
多任務(wù)除了使用多進(jìn)程外還可以使用多線程來完成.單個進(jìn)程中可以有多個線程,它們共享進(jìn)程中的數(shù)據(jù).
python中可使用高級模塊Threading
來創(chuàng)建多線程.其使用方法與multiprocessing
相似.
導(dǎo)入Threading
模塊:
import Threading
*threading 模塊提供的常用方法:
threading.currentThread(): 返回當(dāng)前的線程變量。 (也可以使用threading.current_thread())
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結(jié)束前,不包括啟動前和終止后的線程。
threading.activeCount(): 返回正在運行的線程數(shù)量,與len(threading.enumerate())有相同的結(jié)果。
threading模塊提供的類:
Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.*
構(gòu)建新線程實例:
t = Threading.thread(target=function,...)
同時構(gòu)建實例支持以下幾種方法:
*Thread(group=None, target=None, name=None, args=(), kwargs={})
group: 線程組,目前還沒有實現(xiàn),庫引用中提示必須是None;
target: 要執(zhí)行的方法;
name: 線程名;
args/kwargs: 要傳入方法的參數(shù)。*
實例支持以下方法:
*isAlive(): 返回線程是否在運行。正在運行指啟動后、終止前。
get/setName(name): 獲取/設(shè)置線程名。
is/setDaemon(bool): 獲取/設(shè)置是否守護(hù)線程。初始值從創(chuàng)建該線程的線程繼承。當(dāng)沒有非守護(hù)線程仍在運行時,程序?qū)⒔K止。
start(): 啟動線程。
join([timeout]): 阻塞當(dāng)前上下文環(huán)境的線程,直到調(diào)用此方法的線程終止或到達(dá)指定的timeout(可選參數(shù))。*
使用:
t.start()
運行新線程.
如需等待線程運行結(jié)束:
t.join()
鎖
由于多線程共享進(jìn)程中的變量, 如果直接使用多線程修改變量的話容易出問題.所以多線程中一般會創(chuàng)建鎖.
創(chuàng)建鎖:
lock = threading.Lock()
此時有了一個鎖的實例.
*鎖的實例方法:
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。*
在每個線程需要修改變量前調(diào)用實例方法,嘗試將修改變量的過程置于鎖中,不讓其他線程修改變量:
lock.acquire()
修改之后需要釋放鎖:
lock.release()
例子1:
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要釋放鎖:
lock.release()
例子2:
# encoding: UTF-8
import threading
import time
data = 0
lock = threading.Lock()
def func():
global data
print '%s acquire lock...' % threading.currentThread().getName()
# 調(diào)用acquire([timeout])時,線程將一直阻塞,
# 直到獲得鎖定或者直到timeout秒后(timeout參數(shù)可選)。
# 返回是否獲得鎖。
if lock.acquire():
print '%s get the lock.' % threading.currentThread().getName()
data += 1
time.sleep(2)
print '%s release lock...' % threading.currentThread().getName()
# 調(diào)用release()將釋放鎖。
lock.release()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t3 = threading.Thread(target=func)
t1.start()
t2.start()
t3.start()
其他鎖及線程間通信見參考資料5中.
多線程的全局變量與局部變量
多線程之間修改全局變量需要加鎖. 在線程的函數(shù)中創(chuàng)建局部變量可以解決加鎖問題, 但如果線程需要運行不同函數(shù), 函數(shù)之間需要共享變量, 局部變量調(diào)用不是很方便. threading.local()
可以解決這個問題.
localschool = threading.local()
創(chuàng)建實例后, 不同線程在同時使用實例時不會產(chǎn)生沖突.
例子:
import threading
# 創(chuàng)建全局ThreadLocal對象:
local_school = threading.local()
def process_student():
print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
分布式進(jìn)程(待完善
協(xié)程協(xié)程是單線程在不同的函數(shù)間中斷并相互切換的一種運行模式.比如在函數(shù)A運行遇到阻塞時轉(zhuǎn)向運行函數(shù)B,等到函數(shù)B運行結(jié)束再回來接著運行函數(shù)A.與多線程相比協(xié)程沒有鎖的問題.協(xié)程可以在IO密集的程序中節(jié)省IO等待時間,提高運行效率.
yield
python的生成器yield
一定程度上支持協(xié)程.定義生成器yield
可以直接在函數(shù)定義中將return
換成yield
.在調(diào)用生成器函數(shù)時首先將生成器賦給變量,通過變量的.next()
方法調(diào)用生成器生成第一個值.再次調(diào)用.next()
方法可生成第二個值..send(value)
方法可在調(diào)用生成器時給它傳遞一個參數(shù).
例子:
import time
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)
結(jié)果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
更多關(guān)于yield
見參考資料6.
gevent
gevent模塊為python提供了完整的協(xié)程實現(xiàn).
使用需先導(dǎo)入模塊:
import gevent
在網(wǎng)絡(luò)通信中一般還會導(dǎo)入monkey
模塊以將默認(rèn)socket
替換為可協(xié)程的socket
:
from gevent import monkey
monkey.patch_socket()
或者將所有阻塞式調(diào)用,包括socket
, ssl
, threading
, select
都替換為異步式:
from gevent import monkey
monkey.patch_all()
這種替換調(diào)用一般放在第一行.替換后這些調(diào)用會自動處理阻塞問題.
在需使用協(xié)程時要用:
g = gevent.spawn(function,parament)
使用協(xié)程方式啟動函數(shù).參數(shù)為函數(shù)的參數(shù).
等待任務(wù)結(jié)束可以使用:
g.join()
等待所有任務(wù)結(jié)束可以使用:
gevent.joinall(spawnlist)
不過使用monkey
模塊補丁自動處理有時候不能滿足要求.這時我們可以使用其他模塊.
如自動處理會并發(fā)所有連接,如果需要限制并發(fā)數(shù)量的話可以使用Pool
模塊.
from gevent.pool import Pool
新建一個Pool
池:
p = Pool(number)
number為最高并發(fā)數(shù)
在并發(fā)池中啟動函數(shù):
p.spawn(function,parament)
等待所有任務(wù)結(jié)束:
p.join()
需要直接指定跳轉(zhuǎn)時用sleep
函數(shù):
gevent.sleep(time)
其中time
表示此處至少要阻塞time秒.
參考資料:
<<多進(jìn)程>>
http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868323401155ceb3db1e2044f80b974b469eb06cb43000
<
http://www.coder4.com/archives/3352
<
http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html
<
https://blog.weizhe.net/?p=77
<
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html
<<生成器>>
http://wiki.jikexueyuan.com/project/start-learning-python/215.html
<
http://www.gevent.org/intro.html#installation-and-requirements