當(dāng)前位置:首頁(yè) > 技術(shù)學(xué)院 > 技術(shù)前線
[導(dǎo)讀]在上一課時(shí)我們了解了多線程的基本概念,同時(shí)我們也提到,Python 中的多線程是不能很好發(fā)揮多核優(yōu)勢(shì)的,如果想要發(fā)揮多核優(yōu)勢(shì),最好還是使用多進(jìn)程。 那么本課時(shí)我們就來(lái)了解下多進(jìn)程的基本概念和用 Python 實(shí)現(xiàn)多進(jìn)程的方法。

1. 多進(jìn)程的含義

進(jìn)程(Process)是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位。

顧名思義,多進(jìn)程就是啟用多個(gè)進(jìn)程同時(shí)運(yùn)行。由于進(jìn)程是線程的集合,而且進(jìn)程是由一個(gè)或多個(gè)線程構(gòu)成的,所以多進(jìn)程的運(yùn)行意味著有大于或等于進(jìn)程數(shù)量的線程在運(yùn)行。

2. Python 多進(jìn)程的優(yōu)勢(shì)

通過(guò)上一課時(shí)我們知道,由于進(jìn)程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢(shì),一個(gè)進(jìn)程中的多個(gè)線程,在同一時(shí)刻只能有一個(gè)線程運(yùn)行。

而對(duì)于多進(jìn)程來(lái)說(shuō),每個(gè)進(jìn)程都有屬于自己的 GIL,所以,在多核處理器下,多進(jìn)程的運(yùn)行是不會(huì)受 GIL 的影響的。因此,多進(jìn)程能更好地發(fā)揮多核的優(yōu)勢(shì)。

當(dāng)然,對(duì)于爬蟲(chóng)這種 IO 密集型任務(wù)來(lái)說(shuō),多線程和多進(jìn)程影響差別并不大。對(duì)于計(jì)算密集型任務(wù)來(lái)說(shuō),Python 的多進(jìn)程相比多線程,其多核運(yùn)行效率會(huì)有成倍的提升。

總的來(lái)說(shuō),Python 的多進(jìn)程整體來(lái)看是比多線程更有優(yōu)勢(shì)的。所以,在條件允許的情況下,能用多進(jìn)程就盡量用多進(jìn)程。

不過(guò)值得注意的是,由于進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位,所以各個(gè)進(jìn)程之間的數(shù)據(jù)是無(wú)法共享的,如多個(gè)進(jìn)程無(wú)法共享一個(gè)全局變量,進(jìn)程之間的數(shù)據(jù)共享需要有單獨(dú)的機(jī)制來(lái)實(shí)現(xiàn),這在后面也會(huì)講到。

3. 多進(jìn)程的實(shí)現(xiàn)

在 Python 中也有內(nèi)置的庫(kù)來(lái)實(shí)現(xiàn)多進(jìn)程,它就是 multiprocessing。多線程在 IO 密集型用的比較多,也就是在爬蟲(chóng)方面用的比較多。而 CPU 密集型根本就不用多線程。

我們一般的策略是,多進(jìn)程加多線程,這樣的結(jié)合是最好。

multiprocessing 提供了一系列的組件,如 Process(進(jìn)程)、Queue(隊(duì)列)、Semaphore(信號(hào)量)、Pipe(管道)、Lock(鎖)、Pool(進(jìn)程池)等,接下來(lái)讓我們來(lái)了解下它們的使用方法。

4. 直接使用 Process 類

在 multiprocessing 中,每一個(gè)進(jìn)程都用一個(gè) Process 類來(lái)表示。它的 API 調(diào)用如下:

Process([group [, target [, name [, args [, kwargs]]]]])

target 表示調(diào)用對(duì)象,你可以傳入方法的名字。

args 表示被調(diào)用對(duì)象的位置參數(shù)元組,比如 target 是函數(shù) func,他有兩個(gè)參數(shù) m,n,那么 args 就傳入 [m, n] 即可。

kwargs 表示調(diào)用對(duì)象的字典。

name 是別名,相當(dāng)于給這個(gè)進(jìn)程取一個(gè)名字。

group 分組。

我們先用一個(gè)實(shí)例來(lái)感受一下:

# 示例一:

import multiprocessing

def process(index):

print(f"Process: {index}")

if __name__ == '__main__':

for i in range(5):

p = multiprocessing.Process(target=process, args=(i,))

p.start()

# 示例二

import multiprocessing

import time

def start(i):

time.sleep(3)

print(i)

# current process

# 當(dāng)前進(jìn)程

print(multiprocessing.current_process().name) # 當(dāng)前進(jìn)程的名字

print(multiprocessing.current_process().pid) # 進(jìn)程控制符

print(multiprocessing.current_process().is_alive()) # 判斷進(jìn)程是否存活

# 因?yàn)?,我們有些進(jìn)程卡死,所以我就要自己把進(jìn)程卡死

if __name__ == '__main__':

print('start')

p = multiprocessing.Process(target=start, args=(1,), name='p1')

p.start()

print('stop')

這是一個(gè)實(shí)現(xiàn)多進(jìn)程最基礎(chǔ)的方式:通過(guò)創(chuàng)建 Process 來(lái)新建一個(gè)子進(jìn)程,其中 target 參數(shù)傳入方法名,args 是方法的參數(shù),是以元組的形式傳入,其和被調(diào)用的方法 process 的參數(shù)是一一對(duì)應(yīng)的。

注意:這里 args 必須要是一個(gè)元組,如果只有一個(gè)參數(shù),那也要在元組第一個(gè)元素后面加一個(gè)逗號(hào),如果沒(méi)有逗號(hào)則和單個(gè)元素本身沒(méi)有區(qū)別,無(wú)法構(gòu)成元組,導(dǎo)致參數(shù)傳遞出現(xiàn)問(wèn)題。

創(chuàng)建完進(jìn)程之后,我們通過(guò)調(diào)用 start 方法即可啟動(dòng)進(jìn)程了。示例一運(yùn)行結(jié)果如下:

Process: 0

Process: 1

Process: 2

Process: 3

Process: 4

可以看到,我們運(yùn)行了 5 個(gè)子進(jìn)程,每個(gè)進(jìn)程都調(diào)用了 process方法。process方法的 index參數(shù)通過(guò) Process的 args傳入,分別是0~4 這 5 個(gè)序號(hào),最后打印出來(lái),5 個(gè)子進(jìn)程運(yùn)行結(jié)束。

由于進(jìn)程是 Python 中最小的資源分配單元,因此這些進(jìn)程和線程不同,各個(gè)進(jìn)程之間的數(shù)據(jù)是不會(huì)共享的,每啟動(dòng)一個(gè)進(jìn)程,都會(huì)獨(dú)立分配資源。

另外,在當(dāng)前 CPU 核數(shù)足夠的情況下,這些不同的進(jìn)程會(huì)分配給不同的 CPU核來(lái)運(yùn)行,實(shí)現(xiàn)真正的并行執(zhí)行。

multiprocessing還提供了幾個(gè)比較有用的方法,如我們可以通過(guò) cpu_count 的方法來(lái)獲取當(dāng)前機(jī)器 CPU的核心數(shù)量,通過(guò) active_children方法獲取當(dāng)前還在運(yùn)行的所有進(jìn)程。

下面通過(guò)一個(gè)實(shí)例來(lái)看一下:

import multiprocessing

import time

def process(index):

time.sleep(index)

print(f"Process: {index}")

if __name__ == '__main__':

for i in range(5):

p = multiprocessing.Process(target=process, args=[i,])

p.start()

print(f"CPU number: {multiprocessing.cpu_count()}")

for p in multiprocessing.active_children():

print(f"Child process name: {p.name} id: {p.pid}")

print("Process Ended")

運(yùn)行結(jié)果如下:

Process: 0

CPU number: 8

Child process name: Process-5 id: 73595

Child process name: Process-2 id: 73592

Child process name: Process-3 id: 73593

Child process name: Process-4 id: 73594

Process Ended

Process: 1

Process: 2

Process: 3

Process: 4

在上面的例子中我們通過(guò) cpu_count成功獲取了 CPU 核心的數(shù)量:8 個(gè),當(dāng)然不同的機(jī)器結(jié)果可能不同。

另外我們還通過(guò) active_children獲取到了當(dāng)前正在活躍運(yùn)行的進(jìn)程列表。然后我們遍歷了每個(gè)進(jìn)程,并將它們的名稱和進(jìn)程號(hào)打印出來(lái)了,這里進(jìn)程號(hào)直接使用 pid屬性即可獲取,進(jìn)程名稱直接通過(guò) name 屬性即可獲取。

以上我們就完成了多進(jìn)程的創(chuàng)建和一些基本信息的獲取。

5. 繼承 Process 類

在上面的例子中,我們創(chuàng)建進(jìn)程是直接使用 Process這個(gè)類來(lái)創(chuàng)建的,這是一種創(chuàng)建進(jìn)程的方式。不過(guò),創(chuàng)建進(jìn)程的方式不止這一種,同樣,我們也可以像線程 Thread一樣來(lái)通過(guò)繼承的方式創(chuàng)建一個(gè)進(jìn)程類,進(jìn)程的基本操作我們?cè)谧宇惖?run方法中實(shí)現(xiàn)即可。

通過(guò)一個(gè)實(shí)例來(lái)看一下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

for i in range(2, 5):

p = MyProcess(i)

p.start()

我們首先聲明了一個(gè)構(gòu)造方法,這個(gè)方法接收一個(gè) loop參數(shù),代表循環(huán)次數(shù),并將其設(shè)置為全局變量。在 run方法中,又使用這個(gè) loop變量循環(huán)了 loop次并打印了當(dāng)前的進(jìn)程號(hào)和循環(huán)次數(shù)。

在調(diào)用時(shí),我們用 range 方法得到了 2、3、4 三個(gè)數(shù)字,并把它們分別初始化了 MyProcess 進(jìn)程,然后調(diào)用 start 方法將進(jìn)程啟動(dòng)起來(lái)。

注意:這里進(jìn)程的執(zhí)行邏輯需要在 run 方法中實(shí)現(xiàn),啟動(dòng)進(jìn)程需要調(diào)用 start 方法,調(diào)用之后 run 方法便會(huì)執(zhí)行。

運(yùn)行結(jié)果如下:

Pid: 73667 LoopCount: 0

Pid: 73668 LoopCount: 0

Pid: 73669 LoopCount: 0

Pid: 73667 LoopCount: 1

Pid: 73668 LoopCount: 1

Pid: 73669 LoopCount: 1

Pid: 73668 LoopCount: 2

Pid: 73669 LoopCount: 2

Pid: 73669 LoopCount: 3

可以看到,三個(gè)進(jìn)程分別打印出了 2、3、4 條結(jié)果,即進(jìn)程 73667 打印了 2 次 結(jié)果,進(jìn)程 73668 打印了 3 次結(jié)果,進(jìn)程 73669 打印了 4 次結(jié)果。

注意,這里的進(jìn)程 pid 代表進(jìn)程號(hào),不同機(jī)器、不同時(shí)刻運(yùn)行結(jié)果可能不同。

通過(guò)上面的方式,我們也非常方便地實(shí)現(xiàn)了一個(gè)進(jìn)程的定義。為了復(fù)用方便,我們可以把一些方法寫(xiě)在每個(gè)進(jìn)程類里封裝好,在使用時(shí)直接初始化一個(gè)進(jìn)程類運(yùn)行即可。

PID(進(jìn)程控制符)英文全稱為 Process Identifier,它也屬于電工電子類技術(shù)術(shù)語(yǔ)。

PID就是各進(jìn)程的身份標(biāo)識(shí),程序一運(yùn)行系統(tǒng)就會(huì)自動(dòng)分配給進(jìn)程一個(gè)獨(dú)一無(wú)二的PID。進(jìn)程中止后PID被系統(tǒng)回收,可能會(huì)被繼續(xù)分配給新運(yùn)行的程序。

PID一列代表了各進(jìn)程的進(jìn)程ID,也就是說(shuō),PID就是各進(jìn)程的身份標(biāo)識(shí)。

在實(shí)際調(diào)試中,只能先大致設(shè)定一個(gè)經(jīng)驗(yàn)值,然后根據(jù)調(diào)節(jié)效果修改。

6. 守護(hù)進(jìn)程

在多進(jìn)程中,同樣存在守護(hù)進(jìn)程的概念,如果一個(gè)進(jìn)程被設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束后,子進(jìn)程會(huì)自動(dòng)被終止,我們可以通過(guò)設(shè)置 daemon 屬性來(lái)控制是否為守護(hù)進(jìn)程。

還是原來(lái)的例子,增加了 deamon屬性的設(shè)置:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

for i in range(2, 5):

p = MyProcess(i)

p.daemon = True

p.start()

print("Main Process ended")

運(yùn)行結(jié)果如下:

Main Process ended

結(jié)果很簡(jiǎn)單,因?yàn)橹鬟M(jìn)程沒(méi)有做任何事情,直接輸出一句話結(jié)束,所以在這時(shí)也直接終止了子進(jìn)程的運(yùn)行。

這樣可以有效防止無(wú)控制地生成子進(jìn)程。這樣的寫(xiě)法可以讓我們?cè)谥鬟M(jìn)程運(yùn)行結(jié)束后無(wú)需額外擔(dān)心子進(jìn)程是否關(guān)閉,避免了獨(dú)立子進(jìn)程的運(yùn)行。

7. 進(jìn)程等待

上面的運(yùn)行效果其實(shí)不太符合我們預(yù)期:主進(jìn)程運(yùn)行結(jié)束時(shí),子進(jìn)程(守護(hù)進(jìn)程)也都退出了,子進(jìn)程什么都沒(méi)來(lái)得及執(zhí)行。

能不能讓所有子進(jìn)程都執(zhí)行完了然后再結(jié)束呢?當(dāng)然是可以的,只需要加入 join 方法即可,我們可以將代碼改寫(xiě)如下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

print("Main Process start")

processes = []

for i in range(2, 5):

p = MyProcess(i)

processes.append(p)

p.daemon = True

p.start()

for p in processes:

p.join()

print("Main Process ended")

運(yùn)行結(jié)果如下:

Main Process start

Pid: 11776 LoopCount: 0

Pid: 16824 LoopCount: 0

Pid: 10552 LoopCount: 0

Pid: 11776 LoopCount: 1

Pid: 16824 LoopCount: 1

Pid: 10552 LoopCount: 1

Pid: 16824 LoopCount: 2

Pid: 10552 LoopCount: 2

Pid: 10552 LoopCount: 3

Main Process ended

在調(diào)用 start和 join方法后,父進(jìn)程就可以等待所有子進(jìn)程都執(zhí)行完畢后,再打印出結(jié)束的結(jié)果。

默認(rèn)情況下,join是無(wú)限期的。也就是說(shuō),如果有子進(jìn)程沒(méi)有運(yùn)行完畢,主進(jìn)程會(huì)一直等待。這種情況下,如果子進(jìn)程出現(xiàn)問(wèn)題陷入了死循環(huán),主進(jìn)程也會(huì)無(wú)限等待下去。

怎么解決這個(gè)問(wèn)題呢?

可以給 join 方法傳遞一個(gè)超時(shí)參數(shù),代表最長(zhǎng)等待秒數(shù)。如果子進(jìn)程沒(méi)有在這個(gè)指定秒數(shù)之內(nèi)完成,會(huì)被強(qiáng)制返回,主進(jìn)程不再會(huì)等待。也就是說(shuō)這個(gè)參數(shù)設(shè)置了主進(jìn)程等待該子進(jìn)程的最長(zhǎng)時(shí)間。

例如這里我們傳入 1,代表最長(zhǎng)等待 1 秒,代碼改寫(xiě)如下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

processes = []

for i in range(3, 5):

p = MyProcess(i)

processes.append(p)

p.daemon = True

p.start()

for p in processes:

p.join(1)

print("Main Process ended")

運(yùn)行結(jié)果如下:

Pid: 40970 LoopCount: 0

Pid: 40971 LoopCount: 0

Pid: 40970 LoopCount: 1

Pid: 40971 LoopCount: 1

Main Process ended

可以看到,有的子進(jìn)程本來(lái)要運(yùn)行 3 秒,結(jié)果運(yùn)行 1 秒就被強(qiáng)制返回了,由于是守護(hù)進(jìn)程,該子進(jìn)程被終止了。

到這里,我們就了解了守護(hù)進(jìn)程、進(jìn)程等待和超時(shí)設(shè)置的用法。

8. 終止進(jìn)程

當(dāng)然,終止進(jìn)程不止有守護(hù)進(jìn)程這一種做法,我們也可以通過(guò) terminate 方法來(lái)終止某個(gè)子進(jìn)程,另外我們還可以通過(guò) is_alive方法判斷進(jìn)程是否還在運(yùn)行。

下面我們來(lái)看一個(gè)實(shí)例:

import multiprocessing, time

def process():

print("Starting")

time.sleep(5)

print("Finished")

if __name__ == '__main__':

p = multiprocessing.Process(target=process)

print("Before:", p, p.is_alive())

p.start()

print("During:", p, p.is_alive())

p.terminate()

print("Terminate:", p, p.is_alive())

p.join()

print("Joined:", p, p.is_alive())

在上面的例子中,我們用 Process創(chuàng)建了一個(gè)進(jìn)程,接著調(diào)用 start方法啟動(dòng)這個(gè)進(jìn)程,然后調(diào)用 terminate方法將進(jìn)程終止,最后調(diào)用 join方法。

另外,在進(jìn)程運(yùn)行不同的階段,我們還通過(guò) is_alive方法判斷當(dāng)前進(jìn)程是否還在運(yùn)行。

運(yùn)行結(jié)果如下:

Before: False

During: True

Terminate: True

Joined: False

這里有一個(gè)值得注意的地方,在調(diào)用 terminate 方法之后,我們用 is_alive 方法獲取進(jìn)程的狀態(tài)發(fā)現(xiàn)依然還是運(yùn)行狀態(tài)。在調(diào)用 join 方法之后,is_alive 方法獲取進(jìn)程的運(yùn)行狀態(tài)才變?yōu)榻K止?fàn)顟B(tài)。

所以,在調(diào)用 terminate 方法之后,記得要調(diào)用一下 join 方法,這里調(diào)用 join 方法可以為進(jìn)程提供時(shí)間來(lái)更新對(duì)象狀態(tài),用來(lái)反映出最終的進(jìn)程終止效果。

9. 進(jìn)程互斥鎖

在上面的一些實(shí)例中,我們可能會(huì)遇到如下的運(yùn)行結(jié)果:

Pid: 73993 LoopCount: 0

Pid: 73993 LoopCount: 1

Pid: 73994 LoopCount: 0Pid: 73994 LoopCount: 1

Pid: 73994 LoopCount: 2

Pid: 73995 LoopCount: 0

Pid: 73995 LoopCount: 1

Pid: 73995 LoopCount: 2

Pid: 73995 LoopCount: 3

Main Process ended

我們發(fā)現(xiàn),有的輸出結(jié)果沒(méi)有換行。這是什么原因造成的呢?

這種情況是由多個(gè)進(jìn)程 并行執(zhí)行 導(dǎo)致的,兩個(gè)進(jìn)程同時(shí)進(jìn)行了輸出,結(jié)果第一個(gè)進(jìn)程的換行沒(méi)有來(lái)得及輸出,第二個(gè)進(jìn)程就輸出了結(jié)果,導(dǎo)致最終輸出沒(méi)有換行。

那如何來(lái)避免這種問(wèn)題?

如果我們能保證,多個(gè)進(jìn)程運(yùn)行期間的任一時(shí)間,只能一個(gè)進(jìn)程輸出,其他進(jìn)程等待,等剛才那個(gè)進(jìn)程輸出完畢之后,另一個(gè)進(jìn)程再進(jìn)行輸出,這樣就不會(huì)出現(xiàn)輸出沒(méi)有換行的現(xiàn)象了。

這種解決方案實(shí)際上就是實(shí)現(xiàn)了進(jìn)程互斥,避免了多個(gè)進(jìn)程同時(shí)搶占臨界區(qū)(輸出)資源。

我們可以通過(guò) multiprocessing 中的 Lock 來(lái)實(shí)現(xiàn)。Lock ,即鎖,在一個(gè)進(jìn)程輸出時(shí),加鎖,其他進(jìn)程等待。等此進(jìn)程執(zhí)行結(jié)束后,釋放鎖,其他進(jìn)程可以進(jìn)行輸出。

我們首先實(shí)現(xiàn)一個(gè)不加鎖的實(shí)例,代碼如下:

from multiprocessing import Process, Lock

import time

class MyProcess(Process):

def __init__(self, loop, lock):

Process.__init__(self)

self.loop = loop

self.lock = lock

def run(self):

for count in range(self.loop):

time.sleep(0.1)

# self.lock.acquire()

print(f"Pid: {self.pid} LoopCount: {count}")

# self.lock.release()

if __name__ == '__main__':

lock = Lock()

for i in range(10, 15):

p = MyProcess(i, lock)

p.start()

運(yùn)行結(jié)果如下:

Pid: 74030 LoopCount: 0

Pid: 74031 LoopCount: 0

Pid: 74032 LoopCount: 0

Pid: 74033 LoopCount: 0

Pid: 74034 LoopCount: 0

Pid: 74030 LoopCount: 1

Pid: 74031 LoopCount: 1

Pid: 74032 LoopCount: 1Pid: 74033 LoopCount: 1

Pid: 74034 LoopCount: 1

Pid: 74030 LoopCount: 2

...

可以看到運(yùn)行結(jié)果中有些輸出已經(jīng)出現(xiàn)了不換行的問(wèn)題。

我們對(duì)其加鎖,取消掉剛才代碼中的兩行注釋,重新運(yùn)行,運(yùn)行結(jié)果如下:

Pid: 74061 LoopCount: 0

Pid: 74062 LoopCount: 0

Pid: 74063 LoopCount: 0

Pid: 74064 LoopCount: 0

Pid: 74065 LoopCount: 0

Pid: 74061 LoopCount: 1

Pid: 74062 LoopCount: 1

Pid: 74063 LoopCount: 1

Pid: 74064 LoopCount: 1

Pid: 74065 LoopCount: 1

Pid: 74061 LoopCount: 2

Pid: 74062 LoopCount: 2

Pid: 74064 LoopCount: 2

...

這時(shí)輸出效果就正常了。

所以,在訪問(wèn)一些臨界區(qū)資源時(shí),使用 Lock可以有效避免進(jìn)程同時(shí)占用資源而導(dǎo)致的一些問(wèn)題。

10. 信號(hào)量

信號(hào)量(Semaphore),有時(shí)被稱為信號(hào)燈,是在多線程環(huán)境下使用的一種設(shè)施,是可以用來(lái)保證兩個(gè)或多個(gè)關(guān)鍵代碼段不被并發(fā)調(diào)用。在進(jìn)入一個(gè)關(guān)鍵代碼段之前,線程必須獲取一個(gè)信號(hào)量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號(hào)量。其它想進(jìn)入該關(guān)鍵代碼段的線程必須等待直到第一個(gè)線程釋放信號(hào)量。為了完成這個(gè)過(guò)程,需要?jiǎng)?chuàng)建一個(gè)信號(hào)量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個(gè)關(guān)鍵代碼段的首末端。確認(rèn)這些信號(hào)量VI引用的是初始創(chuàng)建的信號(hào)

以一個(gè)停車(chē)場(chǎng)的運(yùn)作為例。簡(jiǎn)單起見(jiàn),假設(shè)停車(chē)場(chǎng)只有三個(gè)車(chē)位,一開(kāi)始三個(gè)車(chē)位都是空的。這時(shí)如果同時(shí)來(lái)了五輛車(chē),看門(mén)人允許其中三輛直接進(jìn)入,然后放下車(chē)攔,剩下的車(chē)則必須在入口等待,此后來(lái)的車(chē)也都不得不在入口處等待。這時(shí),有一輛車(chē)離開(kāi)停車(chē)場(chǎng),看門(mén)人得知后,打開(kāi)車(chē)攔,放入外面的一輛進(jìn)去,如果又離開(kāi)兩輛,則又可以放入兩輛,如此往復(fù)。

在這個(gè)停車(chē)場(chǎng)系統(tǒng)中,車(chē)位是公共資源,每輛車(chē)好比一個(gè)線程,看門(mén)人起的就是信號(hào)量的作用。

進(jìn)程互斥鎖可以使同一時(shí)刻只有一個(gè)進(jìn)程能訪問(wèn)共享資源,如上面的例子所展示的那樣,在同一時(shí)刻只能有一個(gè)進(jìn)程輸出結(jié)果。但有時(shí)候我們需要允許多個(gè)進(jìn)程來(lái)訪問(wèn)共享資源,同時(shí)還需要限制能訪問(wèn)共享資源的進(jìn)程的數(shù)量。

這種需求該如何實(shí)現(xiàn)呢?

可以用信號(hào)量,信號(hào)量是進(jìn)程同步過(guò)程中一個(gè)比較重要的角色。它可以控制臨界資源的數(shù)量,實(shí)現(xiàn)多個(gè)進(jìn)程同時(shí)訪問(wèn)共享資源,限制進(jìn)程的并發(fā)量。

如果你學(xué)過(guò)操作系統(tǒng),那么一定對(duì)這方面非常了解,如果你還不了解信號(hào)量是什么,可以先熟悉一下這個(gè)概念。

我們可以用 multiprocessing 庫(kù)中的 Semaphore 來(lái)實(shí)現(xiàn)信號(hào)量。

那么接下來(lái)我們就用一個(gè)實(shí)例來(lái)演示一下進(jìn)程之間利用 Semaphore做到多個(gè)進(jìn)程共享資源,同時(shí)又限制同時(shí)可訪問(wèn)的進(jìn)程數(shù)量,代碼如下:

from multiprocessing import Process, Semaphore, Lock, Queue

import time

buffer = Queue(10)

empty = Semaphore(2)

full = Semaphore(0)

lock = Lock()

class Consumer(Process):

def run(self):

global buffer, empty, full, lock

while True:

full.acquire()

lock.acquire()

buffer.get()

print('Consumer pop an element')

time.sleep(1)

lock.release()

empty.release()

class Producer(Process):

def run(self):

global buffer, empty, full, lock

while True:

empty.acquire()

lock.acquire()

buffer.put(1)

print('Producer append an element')

time.sleep(1)

lock.release()

full.release()

if __name__ == '__main__':

p = Producer()

c = Consumer()

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print("Main Process Ended")

如上代碼實(shí)現(xiàn)了經(jīng)典的生產(chǎn)者和消費(fèi)者問(wèn)題。它定義了兩個(gè)進(jìn)程類,一個(gè)是消費(fèi)者,一個(gè)是生產(chǎn)者。

另外,這里使用 multiprocessing 中的 Queue 定義了一個(gè)共享隊(duì)列,然后定義了兩個(gè)信號(hào)量 Semaphore,一個(gè)代表緩沖區(qū)空余數(shù),一個(gè)表示緩沖區(qū)占用數(shù)。

生產(chǎn)者 Producer使用 acquire方法來(lái)占用一個(gè)緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來(lái)進(jìn)行加鎖,對(duì)緩沖區(qū)進(jìn)行操作,然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費(fèi)者則相反。

運(yùn)行結(jié)果如下:

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

我們發(fā)現(xiàn)兩個(gè)進(jìn)程在交替運(yùn)行,生產(chǎn)者先放入緩沖區(qū)物品,然后消費(fèi)者取出,不停地進(jìn)行循環(huán)。 你可以通過(guò)上面的例子來(lái)體會(huì)信號(hào)量 Semaphore 的用法,通過(guò) Semaphore 我們很好地控制了進(jìn)程對(duì)資源的并發(fā)訪問(wèn)數(shù)量。

11. 隊(duì)列

在上面的例子中我們使用 Queue作為進(jìn)程通信的共享隊(duì)列使用。

而如果我們把上面程序中的 Queue 換成普通的 list,是完全起不到效果的,因?yàn)檫M(jìn)程和進(jìn)程之間的資源是不共享的。即使在一個(gè)進(jìn)程中改變了這個(gè) list,在另一個(gè)進(jìn)程也不能獲取到這個(gè) list 的狀態(tài),所以聲明全局變量對(duì)多進(jìn)程是沒(méi)有用處的。

那進(jìn)程如何共享數(shù)據(jù)呢?

可以用 Queue,即隊(duì)列。當(dāng)然這里的隊(duì)列指的是 multiprocessing里面的 Queue。

依然用上面的例子,我們一個(gè)進(jìn)程向隊(duì)列中放入隨機(jī)數(shù)據(jù),然后另一個(gè)進(jìn)程取出數(shù)據(jù)。

from multiprocessing import Process, Semaphore, Lock, Queue

import time

from random import random

buffer = Queue(10)

empty = Semaphore(2)

full = Semaphore(0)

lock = Lock()

class Consumer(Process):

def run(self):

global buffer, empty, full, lock

while True:

full.acquire()

lock.acquire()

print(f'Consumer get {buffer.get()}')

time.sleep(1)

lock.release()

empty.release()

class Producer(Process):

def run(self):

global buffer, empty, full, lock

while True:

empty.acquire()

lock.acquire()

num = random()

print(f'Producer put {num}')

buffer.put(num)

time.sleep(1)

lock.release()

full.release()

if __name__ == '__main__':

p = Producer()

c = Consumer()

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print("Main Process Ended")

運(yùn)行結(jié)果如下:

Producer put 0.719213647437

Producer put 0.44287326683

Consumer get 0.719213647437

Consumer get 0.44287326683

Producer put 0.722859424381

Producer put 0.525321338921

Consumer get 0.722859424381

Consumer get 0.525321338921

·······

在上面的例子中我們聲明了兩個(gè)進(jìn)程,一個(gè)進(jìn)程為生產(chǎn)者 Producer,另一個(gè)為消費(fèi)者 Consumer,生產(chǎn)者不斷向 Queue里面添加隨機(jī)數(shù),消費(fèi)者不斷從隊(duì)列里面取隨機(jī)數(shù)。

生產(chǎn)者在放數(shù)據(jù)的時(shí)候調(diào)用了 Queue 的 put 方法,消費(fèi)者在取的時(shí)候使用了 get 方法,這樣我們就通過(guò) Queue 實(shí)現(xiàn)兩個(gè)進(jìn)程的數(shù)據(jù)共享了。

上面的例子有些不好理解,接下來(lái),我們來(lái)用個(gè)比較簡(jiǎn)單的例子:

Python 多進(jìn)程之間是默認(rèn)無(wú)法通信的,因?yàn)槭遣⑿袌?zhí)行的。所以需要借助其他數(shù)據(jù)結(jié)構(gòu)。

舉個(gè)例子:

你一個(gè)進(jìn)程抓取到數(shù)據(jù),要給另一個(gè)進(jìn)程用,就需要進(jìn)程通信。

隊(duì)列:就像排隊(duì)一樣,先進(jìn)先出。也就是你先放進(jìn)去的數(shù)據(jù),也就先取出數(shù)據(jù)。

棧:主要用在 C 和 C++ 上的數(shù)據(jù)結(jié)構(gòu)。主要存儲(chǔ)用戶自定義的數(shù)據(jù)。它是后進(jìn)先出。先進(jìn)去的墊在底層,后進(jìn)的在上面。

from multiprocessing import Process, Queue

# Process :進(jìn)程

# Queue :隊(duì)列

# import multiprocessing

def write(q):

# multiprocessing.current_process().name

# multiprocessing.current_process().pid

# multiprocessing.current_process().is_alive()

print("Process to write: {}" .format(Process.pid))

for i in range(10):

print("Put {} to queue...".format(i))

q.put(i) # 把數(shù)字放到我們的隊(duì)列里面去

def read(q):

print("Process to read: {}" .format(Process.pid))

while True:

# 這里為什么要使用 while 呢?因?yàn)槲覀円粩嗟难h(huán),隊(duì)列當(dāng)中有可能沒(méi)有數(shù)據(jù),所以需要一直循環(huán)獲取。

# 當(dāng)然,你也可以直接指定循環(huán)的次數(shù)

value = q.get() # 獲取隊(duì)列中的數(shù)據(jù)(隊(duì)列中沒(méi)有數(shù)據(jù)就會(huì)阻塞在那里)

print("Get {} from queue." .format(value))

# 所以就有以下策略:一個(gè)線程抓取 url 放入隊(duì)列之中,另一個(gè)隊(duì)列解析

if __name__ == '__main__':

# 父進(jìn)程創(chuàng)建 Queue ,并傳給各個(gè)子進(jìn)程:

q = Queue() # 隊(duì)列

pw = Process(target=write, args=(q, ))

pr = Process(target=read, args=(q, ))

# 啟動(dòng)子進(jìn)程 pw ,寫(xiě)入:

pw.start()

# 啟動(dòng)子進(jìn)程 pr, 讀?。?

pr.start()

# 等待 pw 結(jié)束

pw.join()

# 等待 pr 結(jié)束

pr.join()

舉個(gè)實(shí)操的小例子:

"""

project = 'Code', file_name = 'duoxianc', author = 'AI悅創(chuàng)'

time = '2020/4/20 11:31', product_name = PyCharm

# code is far away from bugs with the god animal protecting

I love animals. They taste delicious.

"""

import time

from multiprocessing import Process, Queue

import requests

from lxml import etree

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36'}

def spider(q):

html = requests.get('http://www.budejie.com', headers = headers)

# print(html.text)

xml = etree.HTML(html.text)

user_name = xml.xpath('//div[@class="u-txt"]/a/text()')

# print(user_name)

q.put(user_name)

def parse(q):

while True:

time.sleep(0.1)

if not q.empty():

value = q.get()

print(f'value:{value}')

break

if __name__ == '__main__':

q = Queue()

sp = Process(target=spider, args=(q,))

pa = Process(target=parse, args=(q,))

sp.start()

pa.start()

sp.join()

pa.join()

12. 管道

剛才我們使用 Queue 實(shí)現(xiàn)了進(jìn)程間的數(shù)據(jù)共享,那么進(jìn)程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用 Pipe,管道。

管道,我們可以把它理解為兩個(gè)進(jìn)程之間通信的通道。管道可以是單向的,即 half-duplex:一個(gè)進(jìn)程負(fù)責(zé)發(fā)消息,另一個(gè)進(jìn)程負(fù)責(zé)收消息;也可以是雙向的 duplex,即互相收發(fā)消息。

默認(rèn)聲明 Pipe 對(duì)象是雙向管道,如果要?jiǎng)?chuàng)建單向管道,可以在初始化的時(shí)候傳入 deplex 參數(shù)為 False。

我們用一個(gè)實(shí)例來(lái)感受一下:

from multiprocessing import Process, Pipe

class Consumer(Process):

def __init__(self, pipe):

Process.__init__(self)

self.pipe = pipe

def run(self):

self.pipe.send('Consumer Words')

print(f'Consumer Received: {self.pipe.recv()}')

class Producer(Process):

def __init__(self, pipe):

Process.__init__(self)

self.pipe = pipe

def run(self):

print(f'Producer Received: {self.pipe.recv()}')

self.pipe.send('Producer Words')

if __name__ == '__main__':

pipe = Pipe()

p = Producer(pipe[0])

c = Consumer(pipe[1])

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print('Main Process Ended')

在這個(gè)例子里我們聲明了一個(gè)默認(rèn)為雙向的管道,然后將管道的兩端分別傳給兩個(gè)進(jìn)程。兩個(gè)進(jìn)程互相收發(fā)。觀察一下結(jié)果:

Producer Received: Consumer Words

Consumer Received: Producer Words

Main Process Ended

管道 Pipe 就像進(jìn)程之間搭建的橋梁,利用它我們就可以很方便地實(shí)現(xiàn)進(jìn)程間通信了。

13. 進(jìn)程池

為什么需要進(jìn)程池與線程池呢,我就用前面我們?cè)谶M(jìn)行上下文切換的時(shí)候會(huì)有資源消耗,而在這個(gè)基礎(chǔ)上,創(chuàng)建線程與刪除線程都是需要消耗更多的資源。而這個(gè)池就節(jié)省了資源消耗,這樣我們就不用進(jìn)行創(chuàng)建和銷毀了,只要獲取里面的使用即可。這里我主要講一下進(jìn)程池與線程池的簡(jiǎn)單用法。

在講之前,我們先看幾個(gè)簡(jiǎn)單的例子:

第一種方法(多任務(wù)):

from multiprocessing import Pool

def function_square(data):

result = data*data

return result

if __name__ == '__main__':

inputs = [i for i in range(100)]

# inputs = (i for i in range(100))

# inputs = list(range(100))

pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會(huì)根據(jù)你電腦狀態(tài),自行創(chuàng)建。

# 按你的電腦自動(dòng)創(chuàng)建相應(yīng)的數(shù)目

# map 把任務(wù)交給進(jìn)程池

# pool.map(function, iterable)

pool_outputs = pool.map(function_square, inputs)

# pool_outputs = pool.map(function_square, (2,3, 4, 5))

pool.close()

pool.join()

print("Pool :", pool_outputs)

第二種方法(單任務(wù)):

from multiprocessing import Pool

def function_square(data):

result = data*data

return result

if __name__ == '__main__':

pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會(huì)根據(jù)你電腦狀態(tài),自行創(chuàng)建。(按你的電腦自動(dòng)創(chuàng)建相應(yīng)的數(shù)目)

# map 把任務(wù)交給進(jìn)程池

# pool.map(function, iterable)

pool_outputs = pool.apply(function_square, args=(10, ))

pool.close()

pool.join()

print("Pool :", pool_outputs)

使用 from multiprocessing import Pool:引入進(jìn)程池 ,那這個(gè)進(jìn)程池,它是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請(qǐng)求提交到進(jìn)程池,如果這個(gè)進(jìn)程池還沒(méi)有滿的話,就創(chuàng)建新的進(jìn)程來(lái)執(zhí)行請(qǐng)求。 如果池滿的話,就會(huì)先等待。

# 那么,我們可以首先聲明這個(gè)進(jìn)程池;

# 然后,使用 map 方法,那其實(shí)這個(gè) map 方法和正常的 map 方法是一致的。

# map:

# pool = Pool()

# pool.map(main, [i*10 for i in range(10)])

# 第一個(gè)參數(shù):他會(huì)將數(shù)組中的每一個(gè)元素拿出來(lái),當(dāng)作函數(shù)的一個(gè)個(gè)參數(shù),然后創(chuàng)建一個(gè)個(gè)進(jìn)程,放到進(jìn)程池里面去運(yùn)行。

# 第二個(gè)參數(shù):構(gòu)造一個(gè)數(shù)組,然后也就是 0 到 90 的這么一個(gè)循環(huán),那我們直接使用 list 構(gòu)造一下

接下來(lái),我們來(lái)系統(tǒng)的講解一下,進(jìn)程池。

在前面,我們講了可以使用 Process 來(lái)創(chuàng)建進(jìn)程,同時(shí)也講了如何用 Semaphore 來(lái)控制進(jìn)程的并發(fā)執(zhí)行數(shù)量。

假如現(xiàn)在我們遇到這么一個(gè)問(wèn)題,我有 10000 個(gè)任務(wù),每個(gè)任務(wù)需要啟動(dòng)一個(gè)進(jìn)程來(lái)執(zhí)行,并且一個(gè)進(jìn)程運(yùn)行完畢之后要緊接著啟動(dòng)下一個(gè)進(jìn)程,同時(shí)我還需要控制進(jìn)程的并發(fā)數(shù)量,不能并發(fā)太高,不然 CPU 處理不過(guò)來(lái)(如果同時(shí)運(yùn)行的進(jìn)程能維持在一個(gè)最高恒定值當(dāng)然利用率是最高的)。

那么我們?cè)撊绾蝸?lái)實(shí)現(xiàn)這個(gè)需求呢?

用 Process 和 Semaphore 可以實(shí)現(xiàn),但是實(shí)現(xiàn)起來(lái)比較我們可以用 Process 和 Semaphore 解決問(wèn)題,但是實(shí)現(xiàn)起來(lái)比較煩瑣。而這種需求在平時(shí)又是非常常見(jiàn)的。此時(shí),我們就可以派上進(jìn)程池了,即 multiprocessing 中的 Pool。

Pool 可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請(qǐng)求提交到 pool 中時(shí),如果池還沒(méi)有滿,就會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,直到池中有進(jìn)程結(jié)束,才會(huì)創(chuàng)建新的進(jìn)程來(lái)執(zhí)行它。

我們用一個(gè)實(shí)例來(lái)實(shí)現(xiàn)一下,代碼如下:

from multiprocessing import Pool

import time

def function(index):

print(f'Start process: {index}')

time.sleep(3)

print(f'End process {index}',)

if __name__ == '__main__':

pool = Pool(processes=3)# 不斷地改變?cè)摂?shù)字觀察輸出結(jié)果

for i in range(4):

pool.apply_async(function, args=(i, ))

print('Main Process started')

pool.close()

pool.join()

print("Main Process ended")

在這個(gè)例子中我們聲明了一個(gè)大小為 3 的進(jìn)程池,通過(guò) processes 參數(shù)來(lái)指定,如果不指定,那么會(huì)自動(dòng)根據(jù)處理器內(nèi)核來(lái)分配進(jìn)程數(shù)。接著我們使用 apply_async 方法將進(jìn)程添加進(jìn)去,args 可以用來(lái)傳遞參數(shù)。

運(yùn)行結(jié)果如下:

Main Process started

Start process: 0

Start process: 1

Start process: 2

End process 0

End process 1

End process 2

Start process: 3

End process 3

Main Process ended

進(jìn)程池大小為 3,所以最初可以看到有 3 個(gè)進(jìn)程同時(shí)執(zhí)行,第進(jìn)程池大小為 3,所以最初可以看到有 3 個(gè)進(jìn)程同時(shí)執(zhí)行,第4個(gè)進(jìn)程在等待,在有進(jìn)程運(yùn)行完畢之后,第4個(gè)進(jìn)程馬上跟著運(yùn)行,出現(xiàn)了如上的運(yùn)行效果。

最后,我們要記得調(diào)用 close 方法來(lái)關(guān)閉進(jìn)程池,使其不再接受新的任務(wù),然后調(diào)用 join 方法讓主進(jìn)程等待子進(jìn)程的退出,等子進(jìn)程運(yùn)行完畢之后,主進(jìn)程接著運(yùn)行并結(jié)束。

不過(guò)上面的寫(xiě)法多少有些煩瑣,這里再介紹進(jìn)程池一個(gè)更好用的 map 方法,可以將上述寫(xiě)法簡(jiǎn)化很多。

map 方法是怎么用的呢?

第一個(gè)參數(shù)就是要啟動(dòng)的進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)是一個(gè)可迭代對(duì)象,其中的每個(gè)元素會(huì)被傳遞給這個(gè)執(zhí)行方法。

舉個(gè)例子:

現(xiàn)在我們有一個(gè) list,里面包含了很多 URL,另外我們也定義了一個(gè)方法用來(lái)抓取每個(gè) URL 內(nèi)容并解析,那么我們可以直接在 map 的第一個(gè)參數(shù)傳入方法名,第 2 個(gè)參數(shù)傳入 URL 數(shù)組。

我們用一個(gè)實(shí)例來(lái)感受一下:

from multiprocessing import Pool

import urllib.request

import urllib.error

def scrape(url):

try:

urllib.request.urlopen(url)

print(f'URL {url} Scraped')

except (urllib.error.HTTPError, urllib.error.URLError):

print(f'URL {url} not Scraped')

if __name__ == '__main__':

pool = Pool(processes=3)

urls = [

'https://www.baidu.com',

'http://www.meituan.com/',

'https://www.aiyc.top/',

'http://xxxyxxx.net',

]

pool.map(scrape, urls)

pool.close()

這個(gè)例子中我們先定義了一個(gè) scrape 方法,它接收一個(gè)參數(shù) url,這里就是請(qǐng)求了一下這個(gè)鏈接,然后輸出爬取成功的信息,如果發(fā)生錯(cuò)誤,則會(huì)輸出爬取失敗的信息。

首先我們要初始化一個(gè) Pool,指定進(jìn)程數(shù)為 3。然后我們聲明一個(gè) urls 列表,接著我們調(diào)用了 map 方法,第 1 個(gè)參數(shù)就是進(jìn)程對(duì)應(yīng)的執(zhí)行方法,第 2 個(gè)參數(shù)就是 urls 列表,map 方法會(huì)依次將 urls 的每個(gè)元素作為 scrape 的參數(shù)傳遞并啟動(dòng)一個(gè)新的進(jìn)程,加到進(jìn)程池中執(zhí)行。

運(yùn)行結(jié)果如下:

URL https://www.baidu.com Scraped

URL http://xxxyxxx.net not Scraped

URL https://www.aiyc.top/ Scraped

URL http://www.meituan.com/ Scraped

這樣,我們就可以實(shí)現(xiàn) 3 個(gè)進(jìn)程并行運(yùn)行。不同的進(jìn)程相互獨(dú)立地輸出了對(duì)應(yīng)的爬取結(jié)果。

可以看到,我們利用 Pool 的 map 方法非常方便地實(shí)現(xiàn)了多進(jìn)程的執(zhí)行。后面我們也會(huì)在實(shí)戰(zhàn)案例中結(jié)合進(jìn)程池來(lái)實(shí)現(xiàn)數(shù)據(jù)的爬取。

以上便是 Python 中多進(jìn)程的基本用法,本節(jié)內(nèi)容比較多,后面的實(shí)戰(zhàn)案例也會(huì)用到這些內(nèi)容,需要好好掌握。

14. 實(shí)戰(zhàn)(貓眼 TOP100 + re + multiprocessing)

# !/usr/bin/python3

# -*- coding: utf-8 -*-

# @Author:AI悅創(chuàng) @DateTime :2020/2/12 15:23 @Function :功能 Development_tool :PyCharm

# code is far away from bugs with the god animal protecting

# I love animals. They taste delicious.

# https://maoyan.com/board/4?offset=0

# https://maoyan.com/board/4?offset=10

# https://maoyan.com/board/4?offset=20

# https://maoyan.com/board/4?offset=30

import requests,re,json

from requests.exceptions import RequestException

from multiprocessing import Pool # 引入進(jìn)程池

headers = {

'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36'

}

session = requests.Session()

session.headers = headers

def get_one_page(url):

try:

response = session.get(url)

if response.status_code == 200:

return response.text

return None

except RequestException:

return None

def parse_one_page(html):

pattern = re.compile('

.*?board-index.*?>(\d+).*?data-src="(.*?)".*?name.*?>

+'.*?>(.*?).*?star">(.*?)

.*?releasetime">(.*?)

.*?integer">'

+'(.*?).*?fraction">(.*?).*?

', re.S)

# 標(biāo)簽的開(kāi)始和結(jié)尾都要寫(xiě)出來(lái)!!!

items = re.findall(pattern, html)

# 使用 yield 把這個(gè)方法變成一個(gè)生成器

# 要把返回的結(jié)果做成一個(gè)鍵值對(duì)的形式

for item in items:

yield {

'index': item[0],

'image': item[1],

'title': item[2],

'actor': item[3].strip()[17:],

'time': item[4][5:],

'score': item[5]+item[6]

}

def write_to_file(content):

# print(type(content))

# with open('result.txt', 'a') as f:

with open('result.txt', 'a', encoding='utf-8') as f:

# 字典轉(zhuǎn)換成字符串

# f.write(json.dumps(content) + '\n') # 中文編碼變成 Unicode

f.write(json.dumps(content, ensure_ascii=False) + '\n')

f.close()

def main(offset):

url = f'https://maoyan.com/board/4?offset={offset}'

html = get_one_page(url)

for item in parse_one_page(html):

print(item)

write_to_file(item)

# 1.0

# if __name__ == '__main__':

# for i in range(10): # range(0, 100, 10)

# main(i*10)

# 2.0

if __name__ == '__main__':

pool = Pool()

pool.map(main, [i*10 for i in range(10)])

# 優(yōu)化,如果你要秒抓的話,使用 from multiprocessing import Pool # 引入進(jìn)程池 ,當(dāng)然我們目的不是秒抓,而是學(xué)習(xí)一下多進(jìn)程的用法

# 那么這個(gè)進(jìn)程池,他是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請(qǐng)求提交到進(jìn)程池,如果這個(gè)進(jìn)程池還沒(méi)有滿的話,就創(chuàng)建新的進(jìn)程來(lái)執(zhí)行請(qǐng)求。

# 如果池滿的話,就會(huì)先等待

# 那么,我們可以首先聲明這個(gè)進(jìn)程池;

# 然后,使用 map 方法,那其實(shí)這個(gè) map 方法和正常的 map 方法是一致的。

# map:

# pool = Pool()

# pool.map(main, [i*10 for i in range(10)])

# 第一個(gè)參數(shù):他會(huì)將數(shù)組中的每一個(gè)元素拿出來(lái),當(dāng)作函數(shù)的一個(gè)個(gè)參數(shù),然后創(chuàng)建一個(gè)個(gè)進(jìn)程,放到進(jìn)程池里面去運(yùn)行。

# 第二個(gè)參數(shù):構(gòu)造一個(gè)數(shù)組,然后也就是 0 到 90 的這么一個(gè)循環(huán),那我們直接使用 list 構(gòu)造一下

15. 補(bǔ)充:線程池

我找了許多包,這個(gè)包還是不錯(cuò)的:Pip install threadpool

# project = 'Code', file_name = '線程池', author = 'AI悅創(chuàng)'

# time = '2020/3/3 0:05', product_name = PyCharm

# code is far away from bugs with the god animal protecting

# I love animals. They taste delicious.

import time

import threadpool

# 執(zhí)行比較耗時(shí)的函數(shù),需要開(kāi)多線程

def get_html(url):

time.sleep(3)

print(url)

# 按原本的單線程運(yùn)行時(shí)間為:300s

# 而多線程池的化:30s

# 使用多線程執(zhí)行 telent 函數(shù)

urls = [i for i in range(100)]

pool = threadpool.ThreadPool(10) # 建立線程池

# 提交任務(wù)給線程池

requests = threadpool.makeRequests(get_html, urls)

# 開(kāi)始執(zhí)行任務(wù)

for req in requests:

pool.putRequest(req)

pool.wait()

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請(qǐng)聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請(qǐng)及時(shí)聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車(chē)的華為或?qū)⒋呱龈蟮莫?dú)角獸公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

倫敦2024年8月29日 /美通社/ -- 英國(guó)汽車(chē)技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車(chē)工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車(chē)。 SODA V工具的開(kāi)發(fā)耗時(shí)1.5...

關(guān)鍵字: 汽車(chē) 人工智能 智能驅(qū)動(dòng) BSP

北京2024年8月28日 /美通社/ -- 越來(lái)越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運(yùn)行,同時(shí)企業(yè)卻面臨越來(lái)越多業(yè)務(wù)中斷的風(fēng)險(xiǎn),如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報(bào)道,騰訊和網(wǎng)易近期正在縮減他們對(duì)日本游戲市場(chǎng)的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)開(kāi)幕式在貴陽(yáng)舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國(guó)國(guó)際大數(shù)據(jù)產(chǎn)業(yè)博覽會(huì)上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語(yǔ)權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點(diǎn): 有效應(yīng)對(duì)環(huán)境變化,經(jīng)營(yíng)業(yè)績(jī)穩(wěn)中有升 落實(shí)提質(zhì)增效舉措,毛利潤(rùn)率延續(xù)升勢(shì) 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長(zhǎng) 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競(jìng)爭(zhēng)力 堅(jiān)持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競(jìng)爭(zhēng)優(yōu)勢(shì)...

關(guān)鍵字: 通信 BSP 電信運(yùn)營(yíng)商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺(tái)與中國(guó)電影電視技術(shù)學(xué)會(huì)聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會(huì)上宣布正式成立。 活動(dòng)現(xiàn)場(chǎng) NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長(zhǎng)三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會(huì)上,軟通動(dòng)力信息技術(shù)(集團(tuán))股份有限公司(以下簡(jiǎn)稱"軟通動(dòng)力")與長(zhǎng)三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉