1. 多進(jìn)程的含義
進(jìn)程(Process)是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位。
顧名思義,多進(jìn)程就是啟用多個(gè)進(jìn)程同時(shí)運(yùn)行。由于進(jìn)程是線程的集合,而且進(jìn)程是由一個(gè)或多個(gè)線程構(gòu)成的,所以多進(jìn)程的運(yùn)行意味著有大于或等于進(jìn)程數(shù)量的線程在運(yùn)行。
2. Python 多進(jìn)程的優(yōu)勢(shì)
通過上一課時(shí)我們知道,由于進(jìn)程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢(shì),一個(gè)進(jìn)程中的多個(gè)線程,在同一時(shí)刻只能有一個(gè)線程運(yùn)行。
而對(duì)于多進(jìn)程來說,每個(gè)進(jìn)程都有屬于自己的 GIL,所以,在多核處理器下,多進(jìn)程的運(yùn)行是不會(huì)受 GIL 的影響的。因此,多進(jìn)程能更好地發(fā)揮多核的優(yōu)勢(shì)。
當(dāng)然,對(duì)于爬蟲這種 IO 密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對(duì)于計(jì)算密集型任務(wù)來說,Python 的多進(jìn)程相比多線程,其多核運(yùn)行效率會(huì)有成倍的提升。
總的來說,Python 的多進(jìn)程整體來看是比多線程更有優(yōu)勢(shì)的。所以,在條件允許的情況下,能用多進(jìn)程就盡量用多進(jìn)程。
不過值得注意的是,由于進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位,所以各個(gè)進(jìn)程之間的數(shù)據(jù)是無法共享的,如多個(gè)進(jìn)程無法共享一個(gè)全局變量,進(jìn)程之間的數(shù)據(jù)共享需要有單獨(dú)的機(jī)制來實(shí)現(xiàn),這在后面也會(huì)講到。
3. 多進(jìn)程的實(shí)現(xiàn)
在 Python 中也有內(nèi)置的庫(kù)來實(shí)現(xiàn)多進(jìn)程,它就是 multiprocessing。多線程在 IO 密集型用的比較多,也就是在爬蟲方面用的比較多。而 CPU 密集型根本就不用多線程。
我們一般的策略是,多進(jìn)程加多線程,這樣的結(jié)合是最好。
multiprocessing 提供了一系列的組件,如 Process(進(jìn)程)、Queue(隊(duì)列)、Semaphore(信號(hào)量)、Pipe(管道)、Lock(鎖)、Pool(進(jìn)程池)等,接下來讓我們來了解下它們的使用方法。
4. 直接使用 Process 類
在 multiprocessing 中,每一個(gè)進(jìn)程都用一個(gè) Process 類來表示。它的 API 調(diào)用如下:
Process([group [, target [, name [, args [, kwargs]]]]])
target 表示調(diào)用對(duì)象,你可以傳入方法的名字。
args 表示被調(diào)用對(duì)象的位置參數(shù)元組,比如 target 是函數(shù) func,他有兩個(gè)參數(shù) m,n,那么 args 就傳入 [m, n] 即可。
kwargs 表示調(diào)用對(duì)象的字典。
name 是別名,相當(dāng)于給這個(gè)進(jìn)程取一個(gè)名字。
group 分組。
我們先用一個(gè)實(shí)例來感受一下:
# 示例一:
import multiprocessing
def process(index):
print(f"Process: {index}")
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(i,))
p.start()
# 示例二
import multiprocessing
import time
def start(i):
time.sleep(3)
print(i)
# current process
# 當(dāng)前進(jìn)程
print(multiprocessing.current_process().name) # 當(dāng)前進(jìn)程的名字
print(multiprocessing.current_process().pid) # 進(jìn)程控制符
print(multiprocessing.current_process().is_alive()) # 判斷進(jìn)程是否存活
# 因?yàn)?,我們有些進(jìn)程卡死,所以我就要自己把進(jìn)程卡死
if __name__ == '__main__':
print('start')
p = multiprocessing.Process(target=start, args=(1,), name='p1')
p.start()
print('stop')
這是一個(gè)實(shí)現(xiàn)多進(jìn)程最基礎(chǔ)的方式:通過創(chuàng)建 Process 來新建一個(gè)子進(jìn)程,其中 target 參數(shù)傳入方法名,args 是方法的參數(shù),是以元組的形式傳入,其和被調(diào)用的方法 process 的參數(shù)是一一對(duì)應(yīng)的。
注意:這里 args 必須要是一個(gè)元組,如果只有一個(gè)參數(shù),那也要在元組第一個(gè)元素后面加一個(gè)逗號(hào),如果沒有逗號(hào)則和單個(gè)元素本身沒有區(qū)別,無法構(gòu)成元組,導(dǎo)致參數(shù)傳遞出現(xiàn)問題。
創(chuàng)建完進(jìn)程之后,我們通過調(diào)用 start 方法即可啟動(dòng)進(jìn)程了。示例一運(yùn)行結(jié)果如下:
Process: 0
Process: 1
Process: 2
Process: 3
Process: 4
可以看到,我們運(yùn)行了 5 個(gè)子進(jìn)程,每個(gè)進(jìn)程都調(diào)用了 process方法。process方法的 index參數(shù)通過 Process的 args傳入,分別是0~4 這 5 個(gè)序號(hào),最后打印出來,5 個(gè)子進(jìn)程運(yùn)行結(jié)束。
由于進(jìn)程是 Python 中最小的資源分配單元,因此這些進(jìn)程和線程不同,各個(gè)進(jìn)程之間的數(shù)據(jù)是不會(huì)共享的,每啟動(dòng)一個(gè)進(jìn)程,都會(huì)獨(dú)立分配資源。
另外,在當(dāng)前 CPU 核數(shù)足夠的情況下,這些不同的進(jìn)程會(huì)分配給不同的 CPU核來運(yùn)行,實(shí)現(xiàn)真正的并行執(zhí)行。
multiprocessing還提供了幾個(gè)比較有用的方法,如我們可以通過 cpu_count 的方法來獲取當(dāng)前機(jī)器 CPU的核心數(shù)量,通過 active_children方法獲取當(dāng)前還在運(yùn)行的所有進(jìn)程。
下面通過一個(gè)實(shí)例來看一下:
import multiprocessing
import time
def process(index):
time.sleep(index)
print(f"Process: {index}")
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=[i,])
p.start()
print(f"CPU number: {multiprocessing.cpu_count()}")
for p in multiprocessing.active_children():
print(f"Child process name: {p.name} id: {p.pid}")
print("Process Ended")
運(yùn)行結(jié)果如下:
Process: 0
CPU number: 8
Child process name: Process-5 id: 73595
Child process name: Process-2 id: 73592
Child process name: Process-3 id: 73593
Child process name: Process-4 id: 73594
Process Ended
Process: 1
Process: 2
Process: 3
Process: 4
在上面的例子中我們通過 cpu_count成功獲取了 CPU 核心的數(shù)量:8 個(gè),當(dāng)然不同的機(jī)器結(jié)果可能不同。
另外我們還通過 active_children獲取到了當(dāng)前正在活躍運(yùn)行的進(jìn)程列表。然后我們遍歷了每個(gè)進(jìn)程,并將它們的名稱和進(jìn)程號(hào)打印出來了,這里進(jìn)程號(hào)直接使用 pid屬性即可獲取,進(jìn)程名稱直接通過 name 屬性即可獲取。
以上我們就完成了多進(jìn)程的創(chuàng)建和一些基本信息的獲取。
5. 繼承 Process 類
在上面的例子中,我們創(chuàng)建進(jìn)程是直接使用 Process這個(gè)類來創(chuàng)建的,這是一種創(chuàng)建進(jìn)程的方式。不過,創(chuàng)建進(jìn)程的方式不止這一種,同樣,我們也可以像線程 Thread一樣來通過繼承的方式創(chuàng)建一個(gè)進(jìn)程類,進(jìn)程的基本操作我們?cè)谧宇惖?run方法中實(shí)現(xiàn)即可。
通過一個(gè)實(shí)例來看一下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.start()
我們首先聲明了一個(gè)構(gòu)造方法,這個(gè)方法接收一個(gè) loop參數(shù),代表循環(huán)次數(shù),并將其設(shè)置為全局變量。在 run方法中,又使用這個(gè) loop變量循環(huán)了 loop次并打印了當(dāng)前的進(jìn)程號(hào)和循環(huán)次數(shù)。
在調(diào)用時(shí),我們用 range 方法得到了 2、3、4 三個(gè)數(shù)字,并把它們分別初始化了 MyProcess 進(jìn)程,然后調(diào)用 start 方法將進(jìn)程啟動(dòng)起來。
注意:這里進(jìn)程的執(zhí)行邏輯需要在 run 方法中實(shí)現(xiàn),啟動(dòng)進(jìn)程需要調(diào)用 start 方法,調(diào)用之后 run 方法便會(huì)執(zhí)行。
運(yùn)行結(jié)果如下:
Pid: 73667 LoopCount: 0
Pid: 73668 LoopCount: 0
Pid: 73669 LoopCount: 0
Pid: 73667 LoopCount: 1
Pid: 73668 LoopCount: 1
Pid: 73669 LoopCount: 1
Pid: 73668 LoopCount: 2
Pid: 73669 LoopCount: 2
Pid: 73669 LoopCount: 3
可以看到,三個(gè)進(jìn)程分別打印出了 2、3、4 條結(jié)果,即進(jìn)程 73667 打印了 2 次 結(jié)果,進(jìn)程 73668 打印了 3 次結(jié)果,進(jìn)程 73669 打印了 4 次結(jié)果。
注意,這里的進(jìn)程 pid 代表進(jìn)程號(hào),不同機(jī)器、不同時(shí)刻運(yùn)行結(jié)果可能不同。
通過上面的方式,我們也非常方便地實(shí)現(xiàn)了一個(gè)進(jìn)程的定義。為了復(fù)用方便,我們可以把一些方法寫在每個(gè)進(jìn)程類里封裝好,在使用時(shí)直接初始化一個(gè)進(jìn)程類運(yùn)行即可。
PID(進(jìn)程控制符)英文全稱為 Process Identifier,它也屬于電工電子類技術(shù)術(shù)語(yǔ)。
PID就是各進(jìn)程的身份標(biāo)識(shí),程序一運(yùn)行系統(tǒng)就會(huì)自動(dòng)分配給進(jìn)程一個(gè)獨(dú)一無二的PID。進(jìn)程中止后PID被系統(tǒng)回收,可能會(huì)被繼續(xù)分配給新運(yùn)行的程序。
PID一列代表了各進(jìn)程的進(jìn)程ID,也就是說,PID就是各進(jìn)程的身份標(biāo)識(shí)。
在實(shí)際調(diào)試中,只能先大致設(shè)定一個(gè)經(jīng)驗(yàn)值,然后根據(jù)調(diào)節(jié)效果修改。
6. 守護(hù)進(jìn)程
在多進(jìn)程中,同樣存在守護(hù)進(jìn)程的概念,如果一個(gè)進(jìn)程被設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束后,子進(jìn)程會(huì)自動(dòng)被終止,我們可以通過設(shè)置 daemon 屬性來控制是否為守護(hù)進(jìn)程。
還是原來的例子,增加了 deamon屬性的設(shè)置:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
print("Main Process ended")
運(yùn)行結(jié)果如下:
Main Process ended
結(jié)果很簡(jiǎn)單,因?yàn)橹鬟M(jìn)程沒有做任何事情,直接輸出一句話結(jié)束,所以在這時(shí)也直接終止了子進(jìn)程的運(yùn)行。
這樣可以有效防止無控制地生成子進(jìn)程。這樣的寫法可以讓我們?cè)谥鬟M(jìn)程運(yùn)行結(jié)束后無需額外擔(dān)心子進(jìn)程是否關(guān)閉,避免了獨(dú)立子進(jìn)程的運(yùn)行。
7. 進(jìn)程等待
上面的運(yùn)行效果其實(shí)不太符合我們預(yù)期:主進(jìn)程運(yùn)行結(jié)束時(shí),子進(jìn)程(守護(hù)進(jìn)程)也都退出了,子進(jìn)程什么都沒來得及執(zhí)行。
能不能讓所有子進(jìn)程都執(zhí)行完了然后再結(jié)束呢?當(dāng)然是可以的,只需要加入 join 方法即可,我們可以將代碼改寫如下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
print("Main Process start")
processes = []
for i in range(2, 5):
p = MyProcess(i)
processes.append(p)
p.daemon = True
p.start()
for p in processes:
p.join()
print("Main Process ended")
運(yùn)行結(jié)果如下:
Main Process start
Pid: 11776 LoopCount: 0
Pid: 16824 LoopCount: 0
Pid: 10552 LoopCount: 0
Pid: 11776 LoopCount: 1
Pid: 16824 LoopCount: 1
Pid: 10552 LoopCount: 1
Pid: 16824 LoopCount: 2
Pid: 10552 LoopCount: 2
Pid: 10552 LoopCount: 3
Main Process ended
在調(diào)用 start和 join方法后,父進(jìn)程就可以等待所有子進(jìn)程都執(zhí)行完畢后,再打印出結(jié)束的結(jié)果。
默認(rèn)情況下,join是無限期的。也就是說,如果有子進(jìn)程沒有運(yùn)行完畢,主進(jìn)程會(huì)一直等待。這種情況下,如果子進(jìn)程出現(xiàn)問題陷入了死循環(huán),主進(jìn)程也會(huì)無限等待下去。
怎么解決這個(gè)問題呢?
可以給 join 方法傳遞一個(gè)超時(shí)參數(shù),代表最長(zhǎng)等待秒數(shù)。如果子進(jìn)程沒有在這個(gè)指定秒數(shù)之內(nèi)完成,會(huì)被強(qiáng)制返回,主進(jìn)程不再會(huì)等待。也就是說這個(gè)參數(shù)設(shè)置了主進(jìn)程等待該子進(jìn)程的最長(zhǎng)時(shí)間。
例如這里我們傳入 1,代表最長(zhǎng)等待 1 秒,代碼改寫如下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print(f"Pid: {self.pid} LoopCount: {count}")
if __name__ == '__main__':
processes = []
for i in range(3, 5):
p = MyProcess(i)
processes.append(p)
p.daemon = True
p.start()
for p in processes:
p.join(1)
print("Main Process ended")
運(yùn)行結(jié)果如下:
Pid: 40970 LoopCount: 0
Pid: 40971 LoopCount: 0
Pid: 40970 LoopCount: 1
Pid: 40971 LoopCount: 1
Main Process ended
可以看到,有的子進(jìn)程本來要運(yùn)行 3 秒,結(jié)果運(yùn)行 1 秒就被強(qiáng)制返回了,由于是守護(hù)進(jìn)程,該子進(jìn)程被終止了。
到這里,我們就了解了守護(hù)進(jìn)程、進(jìn)程等待和超時(shí)設(shè)置的用法。
8. 終止進(jìn)程
當(dāng)然,終止進(jìn)程不止有守護(hù)進(jìn)程這一種做法,我們也可以通過 terminate 方法來終止某個(gè)子進(jìn)程,另外我們還可以通過 is_alive方法判斷進(jìn)程是否還在運(yùn)行。
下面我們來看一個(gè)實(shí)例:
import multiprocessing, time
def process():
print("Starting")
time.sleep(5)
print("Finished")
if __name__ == '__main__':
p = multiprocessing.Process(target=process)
print("Before:", p, p.is_alive())
p.start()
print("During:", p, p.is_alive())
p.terminate()
print("Terminate:", p, p.is_alive())
p.join()
print("Joined:", p, p.is_alive())
在上面的例子中,我們用 Process創(chuàng)建了一個(gè)進(jìn)程,接著調(diào)用 start方法啟動(dòng)這個(gè)進(jìn)程,然后調(diào)用 terminate方法將進(jìn)程終止,最后調(diào)用 join方法。
另外,在進(jìn)程運(yùn)行不同的階段,我們還通過 is_alive方法判斷當(dāng)前進(jìn)程是否還在運(yùn)行。
運(yùn)行結(jié)果如下:
Before: False
During: True
Terminate: True
Joined: False
這里有一個(gè)值得注意的地方,在調(diào)用 terminate 方法之后,我們用 is_alive 方法獲取進(jìn)程的狀態(tài)發(fā)現(xiàn)依然還是運(yùn)行狀態(tài)。在調(diào)用 join 方法之后,is_alive 方法獲取進(jìn)程的運(yùn)行狀態(tài)才變?yōu)榻K止?fàn)顟B(tài)。
所以,在調(diào)用 terminate 方法之后,記得要調(diào)用一下 join 方法,這里調(diào)用 join 方法可以為進(jìn)程提供時(shí)間來更新對(duì)象狀態(tài),用來反映出最終的進(jìn)程終止效果。
9. 進(jìn)程互斥鎖
在上面的一些實(shí)例中,我們可能會(huì)遇到如下的運(yùn)行結(jié)果:
Pid: 73993 LoopCount: 0
Pid: 73993 LoopCount: 1
Pid: 73994 LoopCount: 0Pid: 73994 LoopCount: 1
Pid: 73994 LoopCount: 2
Pid: 73995 LoopCount: 0
Pid: 73995 LoopCount: 1
Pid: 73995 LoopCount: 2
Pid: 73995 LoopCount: 3
Main Process ended
我們發(fā)現(xiàn),有的輸出結(jié)果沒有換行。這是什么原因造成的呢?
這種情況是由多個(gè)進(jìn)程 并行執(zhí)行 導(dǎo)致的,兩個(gè)進(jìn)程同時(shí)進(jìn)行了輸出,結(jié)果第一個(gè)進(jìn)程的換行沒有來得及輸出,第二個(gè)進(jìn)程就輸出了結(jié)果,導(dǎo)致最終輸出沒有換行。
那如何來避免這種問題?
如果我們能保證,多個(gè)進(jìn)程運(yùn)行期間的任一時(shí)間,只能一個(gè)進(jìn)程輸出,其他進(jìn)程等待,等剛才那個(gè)進(jìn)程輸出完畢之后,另一個(gè)進(jìn)程再進(jìn)行輸出,這樣就不會(huì)出現(xiàn)輸出沒有換行的現(xiàn)象了。
這種解決方案實(shí)際上就是實(shí)現(xiàn)了進(jìn)程互斥,避免了多個(gè)進(jìn)程同時(shí)搶占臨界區(qū)(輸出)資源。
我們可以通過 multiprocessing 中的 Lock 來實(shí)現(xiàn)。Lock ,即鎖,在一個(gè)進(jìn)程輸出時(shí),加鎖,其他進(jìn)程等待。等此進(jìn)程執(zhí)行結(jié)束后,釋放鎖,其他進(jìn)程可以進(jìn)行輸出。
我們首先實(shí)現(xiàn)一個(gè)不加鎖的實(shí)例,代碼如下:
from multiprocessing import Process, Lock
import time
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
# self.lock.acquire()
print(f"Pid: {self.pid} LoopCount: {count}")
# self.lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
p.start()
運(yùn)行結(jié)果如下:
Pid: 74030 LoopCount: 0
Pid: 74031 LoopCount: 0
Pid: 74032 LoopCount: 0
Pid: 74033 LoopCount: 0
Pid: 74034 LoopCount: 0
Pid: 74030 LoopCount: 1
Pid: 74031 LoopCount: 1
Pid: 74032 LoopCount: 1Pid: 74033 LoopCount: 1
Pid: 74034 LoopCount: 1
Pid: 74030 LoopCount: 2
...
可以看到運(yùn)行結(jié)果中有些輸出已經(jīng)出現(xiàn)了不換行的問題。
我們對(duì)其加鎖,取消掉剛才代碼中的兩行注釋,重新運(yùn)行,運(yùn)行結(jié)果如下:
Pid: 74061 LoopCount: 0
Pid: 74062 LoopCount: 0
Pid: 74063 LoopCount: 0
Pid: 74064 LoopCount: 0
Pid: 74065 LoopCount: 0
Pid: 74061 LoopCount: 1
Pid: 74062 LoopCount: 1
Pid: 74063 LoopCount: 1
Pid: 74064 LoopCount: 1
Pid: 74065 LoopCount: 1
Pid: 74061 LoopCount: 2
Pid: 74062 LoopCount: 2
Pid: 74064 LoopCount: 2
...
這時(shí)輸出效果就正常了。
所以,在訪問一些臨界區(qū)資源時(shí),使用 Lock可以有效避免進(jìn)程同時(shí)占用資源而導(dǎo)致的一些問題。
10. 信號(hào)量
信號(hào)量(Semaphore),有時(shí)被稱為信號(hào)燈,是在多線程環(huán)境下使用的一種設(shè)施,是可以用來保證兩個(gè)或多個(gè)關(guān)鍵代碼段不被并發(fā)調(diào)用。在進(jìn)入一個(gè)關(guān)鍵代碼段之前,線程必須獲取一個(gè)信號(hào)量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號(hào)量。其它想進(jìn)入該關(guān)鍵代碼段的線程必須等待直到第一個(gè)線程釋放信號(hào)量。為了完成這個(gè)過程,需要?jiǎng)?chuàng)建一個(gè)信號(hào)量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個(gè)關(guān)鍵代碼段的首末端。確認(rèn)這些信號(hào)量VI引用的是初始創(chuàng)建的信號(hào)
以一個(gè)停車場(chǎng)的運(yùn)作為例。簡(jiǎn)單起見,假設(shè)停車場(chǎng)只有三個(gè)車位,一開始三個(gè)車位都是空的。這時(shí)如果同時(shí)來了五輛車,看門人允許其中三輛直接進(jìn)入,然后放下車攔,剩下的車則必須在入口等待,此后來的車也都不得不在入口處等待。這時(shí),有一輛車離開停車場(chǎng),看門人得知后,打開車攔,放入外面的一輛進(jìn)去,如果又離開兩輛,則又可以放入兩輛,如此往復(fù)。
在這個(gè)停車場(chǎng)系統(tǒng)中,車位是公共資源,每輛車好比一個(gè)線程,看門人起的就是信號(hào)量的作用。
進(jìn)程互斥鎖可以使同一時(shí)刻只有一個(gè)進(jìn)程能訪問共享資源,如上面的例子所展示的那樣,在同一時(shí)刻只能有一個(gè)進(jìn)程輸出結(jié)果。但有時(shí)候我們需要允許多個(gè)進(jìn)程來訪問共享資源,同時(shí)還需要限制能訪問共享資源的進(jìn)程的數(shù)量。
這種需求該如何實(shí)現(xiàn)呢?
可以用信號(hào)量,信號(hào)量是進(jìn)程同步過程中一個(gè)比較重要的角色。它可以控制臨界資源的數(shù)量,實(shí)現(xiàn)多個(gè)進(jìn)程同時(shí)訪問共享資源,限制進(jìn)程的并發(fā)量。
如果你學(xué)過操作系統(tǒng),那么一定對(duì)這方面非常了解,如果你還不了解信號(hào)量是什么,可以先熟悉一下這個(gè)概念。
我們可以用 multiprocessing 庫(kù)中的 Semaphore 來實(shí)現(xiàn)信號(hào)量。
那么接下來我們就用一個(gè)實(shí)例來演示一下進(jìn)程之間利用 Semaphore做到多個(gè)進(jìn)程共享資源,同時(shí)又限制同時(shí)可訪問的進(jìn)程數(shù)量,代碼如下:
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
buffer.get()
print('Consumer pop an element')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
buffer.put(1)
print('Producer append an element')
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print("Main Process Ended")
如上代碼實(shí)現(xiàn)了經(jīng)典的生產(chǎn)者和消費(fèi)者問題。它定義了兩個(gè)進(jìn)程類,一個(gè)是消費(fèi)者,一個(gè)是生產(chǎn)者。
另外,這里使用 multiprocessing 中的 Queue 定義了一個(gè)共享隊(duì)列,然后定義了兩個(gè)信號(hào)量 Semaphore,一個(gè)代表緩沖區(qū)空余數(shù),一個(gè)表示緩沖區(qū)占用數(shù)。
生產(chǎn)者 Producer使用 acquire方法來占用一個(gè)緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來進(jìn)行加鎖,對(duì)緩沖區(qū)進(jìn)行操作,然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費(fèi)者則相反。
運(yùn)行結(jié)果如下:
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
Consumer pop an element
Consumer pop an element
Producer append an element
Producer append an element
我們發(fā)現(xiàn)兩個(gè)進(jìn)程在交替運(yùn)行,生產(chǎn)者先放入緩沖區(qū)物品,然后消費(fèi)者取出,不停地進(jìn)行循環(huán)。 你可以通過上面的例子來體會(huì)信號(hào)量 Semaphore 的用法,通過 Semaphore 我們很好地控制了進(jìn)程對(duì)資源的并發(fā)訪問數(shù)量。
11. 隊(duì)列
在上面的例子中我們使用 Queue作為進(jìn)程通信的共享隊(duì)列使用。
而如果我們把上面程序中的 Queue 換成普通的 list,是完全起不到效果的,因?yàn)檫M(jìn)程和進(jìn)程之間的資源是不共享的。即使在一個(gè)進(jìn)程中改變了這個(gè) list,在另一個(gè)進(jìn)程也不能獲取到這個(gè) list 的狀態(tài),所以聲明全局變量對(duì)多進(jìn)程是沒有用處的。
那進(jìn)程如何共享數(shù)據(jù)呢?
可以用 Queue,即隊(duì)列。當(dāng)然這里的隊(duì)列指的是 multiprocessing里面的 Queue。
依然用上面的例子,我們一個(gè)進(jìn)程向隊(duì)列中放入隨機(jī)數(shù)據(jù),然后另一個(gè)進(jìn)程取出數(shù)據(jù)。
from multiprocessing import Process, Semaphore, Lock, Queue
import time
from random import random
buffer = Queue(10)
empty = Semaphore(2)
full = Semaphore(0)
lock = Lock()
class Consumer(Process):
def run(self):
global buffer, empty, full, lock
while True:
full.acquire()
lock.acquire()
print(f'Consumer get {buffer.get()}')
time.sleep(1)
lock.release()
empty.release()
class Producer(Process):
def run(self):
global buffer, empty, full, lock
while True:
empty.acquire()
lock.acquire()
num = random()
print(f'Producer put {num}')
buffer.put(num)
time.sleep(1)
lock.release()
full.release()
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print("Main Process Ended")
運(yùn)行結(jié)果如下:
Producer put 0.719213647437
Producer put 0.44287326683
Consumer get 0.719213647437
Consumer get 0.44287326683
Producer put 0.722859424381
Producer put 0.525321338921
Consumer get 0.722859424381
Consumer get 0.525321338921
·······
在上面的例子中我們聲明了兩個(gè)進(jìn)程,一個(gè)進(jìn)程為生產(chǎn)者 Producer,另一個(gè)為消費(fèi)者 Consumer,生產(chǎn)者不斷向 Queue里面添加隨機(jī)數(shù),消費(fèi)者不斷從隊(duì)列里面取隨機(jī)數(shù)。
生產(chǎn)者在放數(shù)據(jù)的時(shí)候調(diào)用了 Queue 的 put 方法,消費(fèi)者在取的時(shí)候使用了 get 方法,這樣我們就通過 Queue 實(shí)現(xiàn)兩個(gè)進(jìn)程的數(shù)據(jù)共享了。
上面的例子有些不好理解,接下來,我們來用個(gè)比較簡(jiǎn)單的例子:
Python 多進(jìn)程之間是默認(rèn)無法通信的,因?yàn)槭遣⑿袌?zhí)行的。所以需要借助其他數(shù)據(jù)結(jié)構(gòu)。
舉個(gè)例子:
你一個(gè)進(jìn)程抓取到數(shù)據(jù),要給另一個(gè)進(jìn)程用,就需要進(jìn)程通信。
隊(duì)列:就像排隊(duì)一樣,先進(jìn)先出。也就是你先放進(jìn)去的數(shù)據(jù),也就先取出數(shù)據(jù)。
棧:主要用在 C 和 C++ 上的數(shù)據(jù)結(jié)構(gòu)。主要存儲(chǔ)用戶自定義的數(shù)據(jù)。它是后進(jìn)先出。先進(jìn)去的墊在底層,后進(jìn)的在上面。
from multiprocessing import Process, Queue
# Process :進(jìn)程
# Queue :隊(duì)列
# import multiprocessing
def write(q):
# multiprocessing.current_process().name
# multiprocessing.current_process().pid
# multiprocessing.current_process().is_alive()
print("Process to write: {}" .format(Process.pid))
for i in range(10):
print("Put {} to queue...".format(i))
q.put(i) # 把數(shù)字放到我們的隊(duì)列里面去
def read(q):
print("Process to read: {}" .format(Process.pid))
while True:
# 這里為什么要使用 while 呢?因?yàn)槲覀円粩嗟难h(huán),隊(duì)列當(dāng)中有可能沒有數(shù)據(jù),所以需要一直循環(huán)獲取。
# 當(dāng)然,你也可以直接指定循環(huán)的次數(shù)
value = q.get() # 獲取隊(duì)列中的數(shù)據(jù)(隊(duì)列中沒有數(shù)據(jù)就會(huì)阻塞在那里)
print("Get {} from queue." .format(value))
# 所以就有以下策略:一個(gè)線程抓取 url 放入隊(duì)列之中,另一個(gè)隊(duì)列解析
if __name__ == '__main__':
# 父進(jìn)程創(chuàng)建 Queue ,并傳給各個(gè)子進(jìn)程:
q = Queue() # 隊(duì)列
pw = Process(target=write, args=(q, ))
pr = Process(target=read, args=(q, ))
# 啟動(dòng)子進(jìn)程 pw ,寫入:
pw.start()
# 啟動(dòng)子進(jìn)程 pr, 讀?。?
pr.start()
# 等待 pw 結(jié)束
pw.join()
# 等待 pr 結(jié)束
pr.join()
舉個(gè)實(shí)操的小例子:
"""
project = 'Code', file_name = 'duoxianc', author = 'AI悅創(chuàng)'
time = '2020/4/20 11:31', product_name = PyCharm
# code is far away from bugs with the god animal protecting
I love animals. They taste delicious.
"""
import time
from multiprocessing import Process, Queue
import requests
from lxml import etree
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36'}
def spider(q):
html = requests.get('http://www.budejie.com', headers = headers)
# print(html.text)
xml = etree.HTML(html.text)
user_name = xml.xpath('//div[@class="u-txt"]/a/text()')
# print(user_name)
q.put(user_name)
def parse(q):
while True:
time.sleep(0.1)
if not q.empty():
value = q.get()
print(f'value:{value}')
break
if __name__ == '__main__':
q = Queue()
sp = Process(target=spider, args=(q,))
pa = Process(target=parse, args=(q,))
sp.start()
pa.start()
sp.join()
pa.join()
12. 管道
剛才我們使用 Queue 實(shí)現(xiàn)了進(jìn)程間的數(shù)據(jù)共享,那么進(jìn)程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用 Pipe,管道。
管道,我們可以把它理解為兩個(gè)進(jìn)程之間通信的通道。管道可以是單向的,即 half-duplex:一個(gè)進(jìn)程負(fù)責(zé)發(fā)消息,另一個(gè)進(jìn)程負(fù)責(zé)收消息;也可以是雙向的 duplex,即互相收發(fā)消息。
默認(rèn)聲明 Pipe 對(duì)象是雙向管道,如果要?jiǎng)?chuàng)建單向管道,可以在初始化的時(shí)候傳入 deplex 參數(shù)為 False。
我們用一個(gè)實(shí)例來感受一下:
from multiprocessing import Process, Pipe
class Consumer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
self.pipe.send('Consumer Words')
print(f'Consumer Received: {self.pipe.recv()}')
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print(f'Producer Received: {self.pipe.recv()}')
self.pipe.send('Producer Words')
if __name__ == '__main__':
pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Main Process Ended')
在這個(gè)例子里我們聲明了一個(gè)默認(rèn)為雙向的管道,然后將管道的兩端分別傳給兩個(gè)進(jìn)程。兩個(gè)進(jìn)程互相收發(fā)。觀察一下結(jié)果:
Producer Received: Consumer Words
Consumer Received: Producer Words
Main Process Ended
管道 Pipe 就像進(jìn)程之間搭建的橋梁,利用它我們就可以很方便地實(shí)現(xiàn)進(jìn)程間通信了。
13. 進(jìn)程池
為什么需要進(jìn)程池與線程池呢,我就用前面我們?cè)谶M(jìn)行上下文切換的時(shí)候會(huì)有資源消耗,而在這個(gè)基礎(chǔ)上,創(chuàng)建線程與刪除線程都是需要消耗更多的資源。而這個(gè)池就節(jié)省了資源消耗,這樣我們就不用進(jìn)行創(chuàng)建和銷毀了,只要獲取里面的使用即可。這里我主要講一下進(jìn)程池與線程池的簡(jiǎn)單用法。
在講之前,我們先看幾個(gè)簡(jiǎn)單的例子:
第一種方法(多任務(wù)):
from multiprocessing import Pool
def function_square(data):
result = data*data
return result
if __name__ == '__main__':
inputs = [i for i in range(100)]
# inputs = (i for i in range(100))
# inputs = list(range(100))
pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會(huì)根據(jù)你電腦狀態(tài),自行創(chuàng)建。
# 按你的電腦自動(dòng)創(chuàng)建相應(yīng)的數(shù)目
# map 把任務(wù)交給進(jìn)程池
# pool.map(function, iterable)
pool_outputs = pool.map(function_square, inputs)
# pool_outputs = pool.map(function_square, (2,3, 4, 5))
pool.close()
pool.join()
print("Pool :", pool_outputs)
第二種方法(單任務(wù)):
from multiprocessing import Pool
def function_square(data):
result = data*data
return result
if __name__ == '__main__':
pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會(huì)根據(jù)你電腦狀態(tài),自行創(chuàng)建。(按你的電腦自動(dòng)創(chuàng)建相應(yīng)的數(shù)目)
# map 把任務(wù)交給進(jìn)程池
# pool.map(function, iterable)
pool_outputs = pool.apply(function_square, args=(10, ))
pool.close()
pool.join()
print("Pool :", pool_outputs)
使用 from multiprocessing import Pool:引入進(jìn)程池 ,那這個(gè)進(jìn)程池,它是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請(qǐng)求提交到進(jìn)程池,如果這個(gè)進(jìn)程池還沒有滿的話,就創(chuàng)建新的進(jìn)程來執(zhí)行請(qǐng)求。 如果池滿的話,就會(huì)先等待。
# 那么,我們可以首先聲明這個(gè)進(jìn)程池;
# 然后,使用 map 方法,那其實(shí)這個(gè) map 方法和正常的 map 方法是一致的。
# map:
# pool = Pool()
# pool.map(main, [i*10 for i in range(10)])
# 第一個(gè)參數(shù):他會(huì)將數(shù)組中的每一個(gè)元素拿出來,當(dāng)作函數(shù)的一個(gè)個(gè)參數(shù),然后創(chuàng)建一個(gè)個(gè)進(jìn)程,放到進(jìn)程池里面去運(yùn)行。
# 第二個(gè)參數(shù):構(gòu)造一個(gè)數(shù)組,然后也就是 0 到 90 的這么一個(gè)循環(huán),那我們直接使用 list 構(gòu)造一下
接下來,我們來系統(tǒng)的講解一下,進(jìn)程池。
在前面,我們講了可以使用 Process 來創(chuàng)建進(jìn)程,同時(shí)也講了如何用 Semaphore 來控制進(jìn)程的并發(fā)執(zhí)行數(shù)量。
假如現(xiàn)在我們遇到這么一個(gè)問題,我有 10000 個(gè)任務(wù),每個(gè)任務(wù)需要啟動(dòng)一個(gè)進(jìn)程來執(zhí)行,并且一個(gè)進(jìn)程運(yùn)行完畢之后要緊接著啟動(dòng)下一個(gè)進(jìn)程,同時(shí)我還需要控制進(jìn)程的并發(fā)數(shù)量,不能并發(fā)太高,不然 CPU 處理不過來(如果同時(shí)運(yùn)行的進(jìn)程能維持在一個(gè)最高恒定值當(dāng)然利用率是最高的)。
那么我們?cè)撊绾蝸韺?shí)現(xiàn)這個(gè)需求呢?
用 Process 和 Semaphore 可以實(shí)現(xiàn),但是實(shí)現(xiàn)起來比較我們可以用 Process 和 Semaphore 解決問題,但是實(shí)現(xiàn)起來比較煩瑣。而這種需求在平時(shí)又是非常常見的。此時(shí),我們就可以派上進(jìn)程池了,即 multiprocessing 中的 Pool。
Pool 可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到 pool 中時(shí),如果池還沒有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來執(zhí)行它。
我們用一個(gè)實(shí)例來實(shí)現(xiàn)一下,代碼如下:
from multiprocessing import Pool
import time
def function(index):
print(f'Start process: {index}')
time.sleep(3)
print(f'End process {index}',)
if __name__ == '__main__':
pool = Pool(processes=3)# 不斷地改變?cè)摂?shù)字觀察輸出結(jié)果
for i in range(4):
pool.apply_async(function, args=(i, ))
print('Main Process started')
pool.close()
pool.join()
print("Main Process ended")
在這個(gè)例子中我們聲明了一個(gè)大小為 3 的進(jìn)程池,通過 processes 參數(shù)來指定,如果不指定,那么會(huì)自動(dòng)根據(jù)處理器內(nèi)核來分配進(jìn)程數(shù)。接著我們使用 apply_async 方法將進(jìn)程添加進(jìn)去,args 可以用來傳遞參數(shù)。
運(yùn)行結(jié)果如下:
Main Process started
Start process: 0
Start process: 1
Start process: 2
End process 0
End process 1
End process 2
Start process: 3
End process 3
Main Process ended
進(jìn)程池大小為 3,所以最初可以看到有 3 個(gè)進(jìn)程同時(shí)執(zhí)行,第進(jìn)程池大小為 3,所以最初可以看到有 3 個(gè)進(jìn)程同時(shí)執(zhí)行,第4個(gè)進(jìn)程在等待,在有進(jìn)程運(yùn)行完畢之后,第4個(gè)進(jìn)程馬上跟著運(yùn)行,出現(xiàn)了如上的運(yùn)行效果。
最后,我們要記得調(diào)用 close 方法來關(guān)閉進(jìn)程池,使其不再接受新的任務(wù),然后調(diào)用 join 方法讓主進(jìn)程等待子進(jìn)程的退出,等子進(jìn)程運(yùn)行完畢之后,主進(jìn)程接著運(yùn)行并結(jié)束。
不過上面的寫法多少有些煩瑣,這里再介紹進(jìn)程池一個(gè)更好用的 map 方法,可以將上述寫法簡(jiǎn)化很多。
map 方法是怎么用的呢?
第一個(gè)參數(shù)就是要啟動(dòng)的進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)是一個(gè)可迭代對(duì)象,其中的每個(gè)元素會(huì)被傳遞給這個(gè)執(zhí)行方法。
舉個(gè)例子:
現(xiàn)在我們有一個(gè) list,里面包含了很多 URL,另外我們也定義了一個(gè)方法用來抓取每個(gè) URL 內(nèi)容并解析,那么我們可以直接在 map 的第一個(gè)參數(shù)傳入方法名,第 2 個(gè)參數(shù)傳入 URL 數(shù)組。
我們用一個(gè)實(shí)例來感受一下:
from multiprocessing import Pool
import urllib.request
import urllib.error
def scrape(url):
try:
urllib.request.urlopen(url)
print(f'URL {url} Scraped')
except (urllib.error.HTTPError, urllib.error.URLError):
print(f'URL {url} not Scraped')
if __name__ == '__main__':
pool = Pool(processes=3)
urls = [
'https://www.baidu.com',
'http://www.meituan.com/',
'https://www.aiyc.top/',
'http://xxxyxxx.net',
]
pool.map(scrape, urls)
pool.close()
這個(gè)例子中我們先定義了一個(gè) scrape 方法,它接收一個(gè)參數(shù) url,這里就是請(qǐng)求了一下這個(gè)鏈接,然后輸出爬取成功的信息,如果發(fā)生錯(cuò)誤,則會(huì)輸出爬取失敗的信息。
首先我們要初始化一個(gè) Pool,指定進(jìn)程數(shù)為 3。然后我們聲明一個(gè) urls 列表,接著我們調(diào)用了 map 方法,第 1 個(gè)參數(shù)就是進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)就是 urls 列表,map 方法會(huì)依次將 urls 的每個(gè)元素作為 scrape 的參數(shù)傳遞并啟動(dòng)一個(gè)新的進(jìn)程,加到進(jìn)程池中執(zhí)行。
運(yùn)行結(jié)果如下:
URL https://www.baidu.com Scraped
URL http://xxxyxxx.net not Scraped
URL https://www.aiyc.top/ Scraped
URL http://www.meituan.com/ Scraped
這樣,我們就可以實(shí)現(xiàn) 3 個(gè)進(jìn)程并行運(yùn)行。不同的進(jìn)程相互獨(dú)立地輸出了對(duì)應(yīng)的爬取結(jié)果。
可以看到,我們利用 Pool 的 map 方法非常方便地實(shí)現(xiàn)了多進(jìn)程的執(zhí)行。后面我們也會(huì)在實(shí)戰(zhàn)案例中結(jié)合進(jìn)程池來實(shí)現(xiàn)數(shù)據(jù)的爬取。
以上便是 Python 中多進(jìn)程的基本用法,本節(jié)內(nèi)容比較多,后面的實(shí)戰(zhàn)案例也會(huì)用到這些內(nèi)容,需要好好掌握。
14. 實(shí)戰(zhàn)(貓眼 TOP100 + re + multiprocessing)
# !/usr/bin/python3
# -*- coding: utf-8 -*-
# @Author:AI悅創(chuàng) @DateTime :2020/2/12 15:23 @Function :功能 Development_tool :PyCharm
# code is far away from bugs with the god animal protecting
# I love animals. They taste delicious.
# https://maoyan.com/board/4?offset=0
# https://maoyan.com/board/4?offset=10
# https://maoyan.com/board/4?offset=20
# https://maoyan.com/board/4?offset=30
import requests,re,json
from requests.exceptions import RequestException
from multiprocessing import Pool # 引入進(jìn)程池
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36'
}
session = requests.Session()
session.headers = headers
def get_one_page(url):
try:
response = session.get(url)
if response.status_code == 200:
return response.text
return None
except RequestException:
return None
def parse_one_page(html):
pattern = re.compile('
.*?board-index.*?>(\d+).*?data-src="(.*?)".*?name.*?>
+'.*?>(.*?).*?star">(.*?)
.*?releasetime">(.*?)
.*?integer">'
+'(.*?).*?fraction">(.*?).*?
', re.S)
# 標(biāo)簽的開始和結(jié)尾都要寫出來!!!
items = re.findall(pattern, html)
# 使用 yield 把這個(gè)方法變成一個(gè)生成器
# 要把返回的結(jié)果做成一個(gè)鍵值對(duì)的形式
for item in items:
yield {
'index': item[0],
'image': item[1],
'title': item[2],
'actor': item[3].strip()[17:],
'time': item[4][5:],
'score': item[5]+item[6]
}
def write_to_file(content):
# print(type(content))
# with open('result.txt', 'a') as f:
with open('result.txt', 'a', encoding='utf-8') as f:
# 字典轉(zhuǎn)換成字符串
# f.write(json.dumps(content) + '\n') # 中文編碼變成 Unicode
f.write(json.dumps(content, ensure_ascii=False) + '\n')
f.close()
def main(offset):
url = f'https://maoyan.com/board/4?offset={offset}'
html = get_one_page(url)
for item in parse_one_page(html):
print(item)
write_to_file(item)
# 1.0
# if __name__ == '__main__':
# for i in range(10): # range(0, 100, 10)
# main(i*10)
# 2.0
if __name__ == '__main__':
pool = Pool()
pool.map(main, [i*10 for i in range(10)])
# 優(yōu)化,如果你要秒抓的話,使用 from multiprocessing import Pool # 引入進(jìn)程池 ,當(dāng)然我們目的不是秒抓,而是學(xué)習(xí)一下多進(jìn)程的用法
# 那么這個(gè)進(jìn)程池,他是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請(qǐng)求提交到進(jìn)程池,如果這個(gè)進(jìn)程池還沒有滿的話,就創(chuàng)建新的進(jìn)程來執(zhí)行請(qǐng)求。
# 如果池滿的話,就會(huì)先等待
# 那么,我們可以首先聲明這個(gè)進(jìn)程池;
# 然后,使用 map 方法,那其實(shí)這個(gè) map 方法和正常的 map 方法是一致的。
# map:
# pool = Pool()
# pool.map(main, [i*10 for i in range(10)])
# 第一個(gè)參數(shù):他會(huì)將數(shù)組中的每一個(gè)元素拿出來,當(dāng)作函數(shù)的一個(gè)個(gè)參數(shù),然后創(chuàng)建一個(gè)個(gè)進(jìn)程,放到進(jìn)程池里面去運(yùn)行。
# 第二個(gè)參數(shù):構(gòu)造一個(gè)數(shù)組,然后也就是 0 到 90 的這么一個(gè)循環(huán),那我們直接使用 list 構(gòu)造一下
15. 補(bǔ)充:線程池
我找了許多包,這個(gè)包還是不錯(cuò)的:Pip install threadpool
# project = 'Code', file_name = '線程池', author = 'AI悅創(chuàng)'
# time = '2020/3/3 0:05', product_name = PyCharm
# code is far away from bugs with the god animal protecting
# I love animals. They taste delicious.
import time
import threadpool
# 執(zhí)行比較耗時(shí)的函數(shù),需要開多線程
def get_html(url):
time.sleep(3)
print(url)
# 按原本的單線程運(yùn)行時(shí)間為:300s
# 而多線程池的化:30s
# 使用多線程執(zhí)行 telent 函數(shù)
urls = [i for i in range(100)]
pool = threadpool.ThreadPool(10) # 建立線程池
# 提交任務(wù)給線程池
requests = threadpool.makeRequests(get_html, urls)
# 開始執(zhí)行任務(wù)
for req in requests:
pool.putRequest(req)
pool.wait()