當(dāng)前位置:首頁 > 技術(shù)學(xué)院 > 技術(shù)前線
[導(dǎo)讀]在上一課時我們了解了多線程的基本概念,同時我們也提到,Python 中的多線程是不能很好發(fā)揮多核優(yōu)勢的,如果想要發(fā)揮多核優(yōu)勢,最好還是使用多進(jìn)程。 那么本課時我們就來了解下多進(jìn)程的基本概念和用 Python 實現(xiàn)多進(jìn)程的方法。

1. 多進(jìn)程的含義

進(jìn)程(Process)是具有一定獨立功能的程序關(guān)于某個數(shù)據(jù)集合上的一次運行活動,是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個獨立單位。

顧名思義,多進(jìn)程就是啟用多個進(jìn)程同時運行。由于進(jìn)程是線程的集合,而且進(jìn)程是由一個或多個線程構(gòu)成的,所以多進(jìn)程的運行意味著有大于或等于進(jìn)程數(shù)量的線程在運行。

2. Python 多進(jìn)程的優(yōu)勢

通過上一課時我們知道,由于進(jìn)程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢,一個進(jìn)程中的多個線程,在同一時刻只能有一個線程運行。

而對于多進(jìn)程來說,每個進(jìn)程都有屬于自己的 GIL,所以,在多核處理器下,多進(jìn)程的運行是不會受 GIL 的影響的。因此,多進(jìn)程能更好地發(fā)揮多核的優(yōu)勢。

當(dāng)然,對于爬蟲這種 IO 密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對于計算密集型任務(wù)來說,Python 的多進(jìn)程相比多線程,其多核運行效率會有成倍的提升。

總的來說,Python 的多進(jìn)程整體來看是比多線程更有優(yōu)勢的。所以,在條件允許的情況下,能用多進(jìn)程就盡量用多進(jìn)程。

不過值得注意的是,由于進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個獨立單位,所以各個進(jìn)程之間的數(shù)據(jù)是無法共享的,如多個進(jìn)程無法共享一個全局變量,進(jìn)程之間的數(shù)據(jù)共享需要有單獨的機(jī)制來實現(xiàn),這在后面也會講到。

3. 多進(jìn)程的實現(xiàn)

在 Python 中也有內(nèi)置的庫來實現(xiàn)多進(jìn)程,它就是 multiprocessing。多線程在 IO 密集型用的比較多,也就是在爬蟲方面用的比較多。而 CPU 密集型根本就不用多線程。

我們一般的策略是,多進(jìn)程加多線程,這樣的結(jié)合是最好。

multiprocessing 提供了一系列的組件,如 Process(進(jìn)程)、Queue(隊列)、Semaphore(信號量)、Pipe(管道)、Lock(鎖)、Pool(進(jìn)程池)等,接下來讓我們來了解下它們的使用方法。

4. 直接使用 Process 類

在 multiprocessing 中,每一個進(jìn)程都用一個 Process 類來表示。它的 API 調(diào)用如下:

Process([group [, target [, name [, args [, kwargs]]]]])

target 表示調(diào)用對象,你可以傳入方法的名字。

args 表示被調(diào)用對象的位置參數(shù)元組,比如 target 是函數(shù) func,他有兩個參數(shù) m,n,那么 args 就傳入 [m, n] 即可。

kwargs 表示調(diào)用對象的字典。

name 是別名,相當(dāng)于給這個進(jìn)程取一個名字。

group 分組。

我們先用一個實例來感受一下:

# 示例一:

import multiprocessing

def process(index):

print(f"Process: {index}")

if __name__ == '__main__':

for i in range(5):

p = multiprocessing.Process(target=process, args=(i,))

p.start()

# 示例二

import multiprocessing

import time

def start(i):

time.sleep(3)

print(i)

# current process

# 當(dāng)前進(jìn)程

print(multiprocessing.current_process().name) # 當(dāng)前進(jìn)程的名字

print(multiprocessing.current_process().pid) # 進(jìn)程控制符

print(multiprocessing.current_process().is_alive()) # 判斷進(jìn)程是否存活

# 因為,我們有些進(jìn)程卡死,所以我就要自己把進(jìn)程卡死

if __name__ == '__main__':

print('start')

p = multiprocessing.Process(target=start, args=(1,), name='p1')

p.start()

print('stop')

這是一個實現(xiàn)多進(jìn)程最基礎(chǔ)的方式:通過創(chuàng)建 Process 來新建一個子進(jìn)程,其中 target 參數(shù)傳入方法名,args 是方法的參數(shù),是以元組的形式傳入,其和被調(diào)用的方法 process 的參數(shù)是一一對應(yīng)的。

注意:這里 args 必須要是一個元組,如果只有一個參數(shù),那也要在元組第一個元素后面加一個逗號,如果沒有逗號則和單個元素本身沒有區(qū)別,無法構(gòu)成元組,導(dǎo)致參數(shù)傳遞出現(xiàn)問題。

創(chuàng)建完進(jìn)程之后,我們通過調(diào)用 start 方法即可啟動進(jìn)程了。示例一運行結(jié)果如下:

Process: 0

Process: 1

Process: 2

Process: 3

Process: 4

可以看到,我們運行了 5 個子進(jìn)程,每個進(jìn)程都調(diào)用了 process方法。process方法的 index參數(shù)通過 Process的 args傳入,分別是0~4 這 5 個序號,最后打印出來,5 個子進(jìn)程運行結(jié)束。

由于進(jìn)程是 Python 中最小的資源分配單元,因此這些進(jìn)程和線程不同,各個進(jìn)程之間的數(shù)據(jù)是不會共享的,每啟動一個進(jìn)程,都會獨立分配資源。

另外,在當(dāng)前 CPU 核數(shù)足夠的情況下,這些不同的進(jìn)程會分配給不同的 CPU核來運行,實現(xiàn)真正的并行執(zhí)行。

multiprocessing還提供了幾個比較有用的方法,如我們可以通過 cpu_count 的方法來獲取當(dāng)前機(jī)器 CPU的核心數(shù)量,通過 active_children方法獲取當(dāng)前還在運行的所有進(jìn)程。

下面通過一個實例來看一下:

import multiprocessing

import time

def process(index):

time.sleep(index)

print(f"Process: {index}")

if __name__ == '__main__':

for i in range(5):

p = multiprocessing.Process(target=process, args=[i,])

p.start()

print(f"CPU number: {multiprocessing.cpu_count()}")

for p in multiprocessing.active_children():

print(f"Child process name: {p.name} id: {p.pid}")

print("Process Ended")

運行結(jié)果如下:

Process: 0

CPU number: 8

Child process name: Process-5 id: 73595

Child process name: Process-2 id: 73592

Child process name: Process-3 id: 73593

Child process name: Process-4 id: 73594

Process Ended

Process: 1

Process: 2

Process: 3

Process: 4

在上面的例子中我們通過 cpu_count成功獲取了 CPU 核心的數(shù)量:8 個,當(dāng)然不同的機(jī)器結(jié)果可能不同。

另外我們還通過 active_children獲取到了當(dāng)前正在活躍運行的進(jìn)程列表。然后我們遍歷了每個進(jìn)程,并將它們的名稱和進(jìn)程號打印出來了,這里進(jìn)程號直接使用 pid屬性即可獲取,進(jìn)程名稱直接通過 name 屬性即可獲取。

以上我們就完成了多進(jìn)程的創(chuàng)建和一些基本信息的獲取。

5. 繼承 Process 類

在上面的例子中,我們創(chuàng)建進(jìn)程是直接使用 Process這個類來創(chuàng)建的,這是一種創(chuàng)建進(jìn)程的方式。不過,創(chuàng)建進(jìn)程的方式不止這一種,同樣,我們也可以像線程 Thread一樣來通過繼承的方式創(chuàng)建一個進(jìn)程類,進(jìn)程的基本操作我們在子類的 run方法中實現(xiàn)即可。

通過一個實例來看一下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

for i in range(2, 5):

p = MyProcess(i)

p.start()

我們首先聲明了一個構(gòu)造方法,這個方法接收一個 loop參數(shù),代表循環(huán)次數(shù),并將其設(shè)置為全局變量。在 run方法中,又使用這個 loop變量循環(huán)了 loop次并打印了當(dāng)前的進(jìn)程號和循環(huán)次數(shù)。

在調(diào)用時,我們用 range 方法得到了 2、3、4 三個數(shù)字,并把它們分別初始化了 MyProcess 進(jìn)程,然后調(diào)用 start 方法將進(jìn)程啟動起來。

注意:這里進(jìn)程的執(zhí)行邏輯需要在 run 方法中實現(xiàn),啟動進(jìn)程需要調(diào)用 start 方法,調(diào)用之后 run 方法便會執(zhí)行。

運行結(jié)果如下:

Pid: 73667 LoopCount: 0

Pid: 73668 LoopCount: 0

Pid: 73669 LoopCount: 0

Pid: 73667 LoopCount: 1

Pid: 73668 LoopCount: 1

Pid: 73669 LoopCount: 1

Pid: 73668 LoopCount: 2

Pid: 73669 LoopCount: 2

Pid: 73669 LoopCount: 3

可以看到,三個進(jìn)程分別打印出了 2、3、4 條結(jié)果,即進(jìn)程 73667 打印了 2 次 結(jié)果,進(jìn)程 73668 打印了 3 次結(jié)果,進(jìn)程 73669 打印了 4 次結(jié)果。

注意,這里的進(jìn)程 pid 代表進(jìn)程號,不同機(jī)器、不同時刻運行結(jié)果可能不同。

通過上面的方式,我們也非常方便地實現(xiàn)了一個進(jìn)程的定義。為了復(fù)用方便,我們可以把一些方法寫在每個進(jìn)程類里封裝好,在使用時直接初始化一個進(jìn)程類運行即可。

PID(進(jìn)程控制符)英文全稱為 Process Identifier,它也屬于電工電子類技術(shù)術(shù)語。

PID就是各進(jìn)程的身份標(biāo)識,程序一運行系統(tǒng)就會自動分配給進(jìn)程一個獨一無二的PID。進(jìn)程中止后PID被系統(tǒng)回收,可能會被繼續(xù)分配給新運行的程序。

PID一列代表了各進(jìn)程的進(jìn)程ID,也就是說,PID就是各進(jìn)程的身份標(biāo)識。

在實際調(diào)試中,只能先大致設(shè)定一個經(jīng)驗值,然后根據(jù)調(diào)節(jié)效果修改。

6. 守護(hù)進(jìn)程

在多進(jìn)程中,同樣存在守護(hù)進(jìn)程的概念,如果一個進(jìn)程被設(shè)置為守護(hù)進(jìn)程,當(dāng)父進(jìn)程結(jié)束后,子進(jìn)程會自動被終止,我們可以通過設(shè)置 daemon 屬性來控制是否為守護(hù)進(jìn)程。

還是原來的例子,增加了 deamon屬性的設(shè)置:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

for i in range(2, 5):

p = MyProcess(i)

p.daemon = True

p.start()

print("Main Process ended")

運行結(jié)果如下:

Main Process ended

結(jié)果很簡單,因為主進(jìn)程沒有做任何事情,直接輸出一句話結(jié)束,所以在這時也直接終止了子進(jìn)程的運行。

這樣可以有效防止無控制地生成子進(jìn)程。這樣的寫法可以讓我們在主進(jìn)程運行結(jié)束后無需額外擔(dān)心子進(jìn)程是否關(guān)閉,避免了獨立子進(jìn)程的運行。

7. 進(jìn)程等待

上面的運行效果其實不太符合我們預(yù)期:主進(jìn)程運行結(jié)束時,子進(jìn)程(守護(hù)進(jìn)程)也都退出了,子進(jìn)程什么都沒來得及執(zhí)行。

能不能讓所有子進(jìn)程都執(zhí)行完了然后再結(jié)束呢?當(dāng)然是可以的,只需要加入 join 方法即可,我們可以將代碼改寫如下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

print("Main Process start")

processes = []

for i in range(2, 5):

p = MyProcess(i)

processes.append(p)

p.daemon = True

p.start()

for p in processes:

p.join()

print("Main Process ended")

運行結(jié)果如下:

Main Process start

Pid: 11776 LoopCount: 0

Pid: 16824 LoopCount: 0

Pid: 10552 LoopCount: 0

Pid: 11776 LoopCount: 1

Pid: 16824 LoopCount: 1

Pid: 10552 LoopCount: 1

Pid: 16824 LoopCount: 2

Pid: 10552 LoopCount: 2

Pid: 10552 LoopCount: 3

Main Process ended

在調(diào)用 start和 join方法后,父進(jìn)程就可以等待所有子進(jìn)程都執(zhí)行完畢后,再打印出結(jié)束的結(jié)果。

默認(rèn)情況下,join是無限期的。也就是說,如果有子進(jìn)程沒有運行完畢,主進(jìn)程會一直等待。這種情況下,如果子進(jìn)程出現(xiàn)問題陷入了死循環(huán),主進(jìn)程也會無限等待下去。

怎么解決這個問題呢?

可以給 join 方法傳遞一個超時參數(shù),代表最長等待秒數(shù)。如果子進(jìn)程沒有在這個指定秒數(shù)之內(nèi)完成,會被強(qiáng)制返回,主進(jìn)程不再會等待。也就是說這個參數(shù)設(shè)置了主進(jìn)程等待該子進(jìn)程的最長時間。

例如這里我們傳入 1,代表最長等待 1 秒,代碼改寫如下:

from multiprocessing import Process

import time

class MyProcess(Process):

def __init__(self, loop):

Process.__init__(self)

self.loop = loop

def run(self):

for count in range(self.loop):

time.sleep(1)

print(f"Pid: {self.pid} LoopCount: {count}")

if __name__ == '__main__':

processes = []

for i in range(3, 5):

p = MyProcess(i)

processes.append(p)

p.daemon = True

p.start()

for p in processes:

p.join(1)

print("Main Process ended")

運行結(jié)果如下:

Pid: 40970 LoopCount: 0

Pid: 40971 LoopCount: 0

Pid: 40970 LoopCount: 1

Pid: 40971 LoopCount: 1

Main Process ended

可以看到,有的子進(jìn)程本來要運行 3 秒,結(jié)果運行 1 秒就被強(qiáng)制返回了,由于是守護(hù)進(jìn)程,該子進(jìn)程被終止了。

到這里,我們就了解了守護(hù)進(jìn)程、進(jìn)程等待和超時設(shè)置的用法。

8. 終止進(jìn)程

當(dāng)然,終止進(jìn)程不止有守護(hù)進(jìn)程這一種做法,我們也可以通過 terminate 方法來終止某個子進(jìn)程,另外我們還可以通過 is_alive方法判斷進(jìn)程是否還在運行。

下面我們來看一個實例:

import multiprocessing, time

def process():

print("Starting")

time.sleep(5)

print("Finished")

if __name__ == '__main__':

p = multiprocessing.Process(target=process)

print("Before:", p, p.is_alive())

p.start()

print("During:", p, p.is_alive())

p.terminate()

print("Terminate:", p, p.is_alive())

p.join()

print("Joined:", p, p.is_alive())

在上面的例子中,我們用 Process創(chuàng)建了一個進(jìn)程,接著調(diào)用 start方法啟動這個進(jìn)程,然后調(diào)用 terminate方法將進(jìn)程終止,最后調(diào)用 join方法。

另外,在進(jìn)程運行不同的階段,我們還通過 is_alive方法判斷當(dāng)前進(jìn)程是否還在運行。

運行結(jié)果如下:

Before: False

During: True

Terminate: True

Joined: False

這里有一個值得注意的地方,在調(diào)用 terminate 方法之后,我們用 is_alive 方法獲取進(jìn)程的狀態(tài)發(fā)現(xiàn)依然還是運行狀態(tài)。在調(diào)用 join 方法之后,is_alive 方法獲取進(jìn)程的運行狀態(tài)才變?yōu)榻K止?fàn)顟B(tài)。

所以,在調(diào)用 terminate 方法之后,記得要調(diào)用一下 join 方法,這里調(diào)用 join 方法可以為進(jìn)程提供時間來更新對象狀態(tài),用來反映出最終的進(jìn)程終止效果。

9. 進(jìn)程互斥鎖

在上面的一些實例中,我們可能會遇到如下的運行結(jié)果:

Pid: 73993 LoopCount: 0

Pid: 73993 LoopCount: 1

Pid: 73994 LoopCount: 0Pid: 73994 LoopCount: 1

Pid: 73994 LoopCount: 2

Pid: 73995 LoopCount: 0

Pid: 73995 LoopCount: 1

Pid: 73995 LoopCount: 2

Pid: 73995 LoopCount: 3

Main Process ended

我們發(fā)現(xiàn),有的輸出結(jié)果沒有換行。這是什么原因造成的呢?

這種情況是由多個進(jìn)程 并行執(zhí)行 導(dǎo)致的,兩個進(jìn)程同時進(jìn)行了輸出,結(jié)果第一個進(jìn)程的換行沒有來得及輸出,第二個進(jìn)程就輸出了結(jié)果,導(dǎo)致最終輸出沒有換行。

那如何來避免這種問題?

如果我們能保證,多個進(jìn)程運行期間的任一時間,只能一個進(jìn)程輸出,其他進(jìn)程等待,等剛才那個進(jìn)程輸出完畢之后,另一個進(jìn)程再進(jìn)行輸出,這樣就不會出現(xiàn)輸出沒有換行的現(xiàn)象了。

這種解決方案實際上就是實現(xiàn)了進(jìn)程互斥,避免了多個進(jìn)程同時搶占臨界區(qū)(輸出)資源。

我們可以通過 multiprocessing 中的 Lock 來實現(xiàn)。Lock ,即鎖,在一個進(jìn)程輸出時,加鎖,其他進(jìn)程等待。等此進(jìn)程執(zhí)行結(jié)束后,釋放鎖,其他進(jìn)程可以進(jìn)行輸出。

我們首先實現(xiàn)一個不加鎖的實例,代碼如下:

from multiprocessing import Process, Lock

import time

class MyProcess(Process):

def __init__(self, loop, lock):

Process.__init__(self)

self.loop = loop

self.lock = lock

def run(self):

for count in range(self.loop):

time.sleep(0.1)

# self.lock.acquire()

print(f"Pid: {self.pid} LoopCount: {count}")

# self.lock.release()

if __name__ == '__main__':

lock = Lock()

for i in range(10, 15):

p = MyProcess(i, lock)

p.start()

運行結(jié)果如下:

Pid: 74030 LoopCount: 0

Pid: 74031 LoopCount: 0

Pid: 74032 LoopCount: 0

Pid: 74033 LoopCount: 0

Pid: 74034 LoopCount: 0

Pid: 74030 LoopCount: 1

Pid: 74031 LoopCount: 1

Pid: 74032 LoopCount: 1Pid: 74033 LoopCount: 1

Pid: 74034 LoopCount: 1

Pid: 74030 LoopCount: 2

...

可以看到運行結(jié)果中有些輸出已經(jīng)出現(xiàn)了不換行的問題。

我們對其加鎖,取消掉剛才代碼中的兩行注釋,重新運行,運行結(jié)果如下:

Pid: 74061 LoopCount: 0

Pid: 74062 LoopCount: 0

Pid: 74063 LoopCount: 0

Pid: 74064 LoopCount: 0

Pid: 74065 LoopCount: 0

Pid: 74061 LoopCount: 1

Pid: 74062 LoopCount: 1

Pid: 74063 LoopCount: 1

Pid: 74064 LoopCount: 1

Pid: 74065 LoopCount: 1

Pid: 74061 LoopCount: 2

Pid: 74062 LoopCount: 2

Pid: 74064 LoopCount: 2

...

這時輸出效果就正常了。

所以,在訪問一些臨界區(qū)資源時,使用 Lock可以有效避免進(jìn)程同時占用資源而導(dǎo)致的一些問題。

10. 信號量

信號量(Semaphore),有時被稱為信號燈,是在多線程環(huán)境下使用的一種設(shè)施,是可以用來保證兩個或多個關(guān)鍵代碼段不被并發(fā)調(diào)用。在進(jìn)入一個關(guān)鍵代碼段之前,線程必須獲取一個信號量;一旦該關(guān)鍵代碼段完成了,那么該線程必須釋放信號量。其它想進(jìn)入該關(guān)鍵代碼段的線程必須等待直到第一個線程釋放信號量。為了完成這個過程,需要創(chuàng)建一個信號量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個關(guān)鍵代碼段的首末端。確認(rèn)這些信號量VI引用的是初始創(chuàng)建的信號

以一個停車場的運作為例。簡單起見,假設(shè)停車場只有三個車位,一開始三個車位都是空的。這時如果同時來了五輛車,看門人允許其中三輛直接進(jìn)入,然后放下車攔,剩下的車則必須在入口等待,此后來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知后,打開車攔,放入外面的一輛進(jìn)去,如果又離開兩輛,則又可以放入兩輛,如此往復(fù)。

在這個停車場系統(tǒng)中,車位是公共資源,每輛車好比一個線程,看門人起的就是信號量的作用。

進(jìn)程互斥鎖可以使同一時刻只有一個進(jìn)程能訪問共享資源,如上面的例子所展示的那樣,在同一時刻只能有一個進(jìn)程輸出結(jié)果。但有時候我們需要允許多個進(jìn)程來訪問共享資源,同時還需要限制能訪問共享資源的進(jìn)程的數(shù)量。

這種需求該如何實現(xiàn)呢?

可以用信號量,信號量是進(jìn)程同步過程中一個比較重要的角色。它可以控制臨界資源的數(shù)量,實現(xiàn)多個進(jìn)程同時訪問共享資源,限制進(jìn)程的并發(fā)量。

如果你學(xué)過操作系統(tǒng),那么一定對這方面非常了解,如果你還不了解信號量是什么,可以先熟悉一下這個概念。

我們可以用 multiprocessing 庫中的 Semaphore 來實現(xiàn)信號量。

那么接下來我們就用一個實例來演示一下進(jìn)程之間利用 Semaphore做到多個進(jìn)程共享資源,同時又限制同時可訪問的進(jìn)程數(shù)量,代碼如下:

from multiprocessing import Process, Semaphore, Lock, Queue

import time

buffer = Queue(10)

empty = Semaphore(2)

full = Semaphore(0)

lock = Lock()

class Consumer(Process):

def run(self):

global buffer, empty, full, lock

while True:

full.acquire()

lock.acquire()

buffer.get()

print('Consumer pop an element')

time.sleep(1)

lock.release()

empty.release()

class Producer(Process):

def run(self):

global buffer, empty, full, lock

while True:

empty.acquire()

lock.acquire()

buffer.put(1)

print('Producer append an element')

time.sleep(1)

lock.release()

full.release()

if __name__ == '__main__':

p = Producer()

c = Consumer()

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print("Main Process Ended")

如上代碼實現(xiàn)了經(jīng)典的生產(chǎn)者和消費者問題。它定義了兩個進(jìn)程類,一個是消費者,一個是生產(chǎn)者。

另外,這里使用 multiprocessing 中的 Queue 定義了一個共享隊列,然后定義了兩個信號量 Semaphore,一個代表緩沖區(qū)空余數(shù),一個表示緩沖區(qū)占用數(shù)。

生產(chǎn)者 Producer使用 acquire方法來占用一個緩沖區(qū)位置,緩沖區(qū)空閑區(qū)大小減 1,接下來進(jìn)行加鎖,對緩沖區(qū)進(jìn)行操作,然后釋放鎖,最后讓代表占用的緩沖區(qū)位置數(shù)量加 1,消費者則相反。

運行結(jié)果如下:

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

Consumer pop an element

Consumer pop an element

Producer append an element

Producer append an element

我們發(fā)現(xiàn)兩個進(jìn)程在交替運行,生產(chǎn)者先放入緩沖區(qū)物品,然后消費者取出,不停地進(jìn)行循環(huán)。 你可以通過上面的例子來體會信號量 Semaphore 的用法,通過 Semaphore 我們很好地控制了進(jìn)程對資源的并發(fā)訪問數(shù)量。

11. 隊列

在上面的例子中我們使用 Queue作為進(jìn)程通信的共享隊列使用。

而如果我們把上面程序中的 Queue 換成普通的 list,是完全起不到效果的,因為進(jìn)程和進(jìn)程之間的資源是不共享的。即使在一個進(jìn)程中改變了這個 list,在另一個進(jìn)程也不能獲取到這個 list 的狀態(tài),所以聲明全局變量對多進(jìn)程是沒有用處的。

那進(jìn)程如何共享數(shù)據(jù)呢?

可以用 Queue,即隊列。當(dāng)然這里的隊列指的是 multiprocessing里面的 Queue。

依然用上面的例子,我們一個進(jìn)程向隊列中放入隨機(jī)數(shù)據(jù),然后另一個進(jìn)程取出數(shù)據(jù)。

from multiprocessing import Process, Semaphore, Lock, Queue

import time

from random import random

buffer = Queue(10)

empty = Semaphore(2)

full = Semaphore(0)

lock = Lock()

class Consumer(Process):

def run(self):

global buffer, empty, full, lock

while True:

full.acquire()

lock.acquire()

print(f'Consumer get {buffer.get()}')

time.sleep(1)

lock.release()

empty.release()

class Producer(Process):

def run(self):

global buffer, empty, full, lock

while True:

empty.acquire()

lock.acquire()

num = random()

print(f'Producer put {num}')

buffer.put(num)

time.sleep(1)

lock.release()

full.release()

if __name__ == '__main__':

p = Producer()

c = Consumer()

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print("Main Process Ended")

運行結(jié)果如下:

Producer put 0.719213647437

Producer put 0.44287326683

Consumer get 0.719213647437

Consumer get 0.44287326683

Producer put 0.722859424381

Producer put 0.525321338921

Consumer get 0.722859424381

Consumer get 0.525321338921

·······

在上面的例子中我們聲明了兩個進(jìn)程,一個進(jìn)程為生產(chǎn)者 Producer,另一個為消費者 Consumer,生產(chǎn)者不斷向 Queue里面添加隨機(jī)數(shù),消費者不斷從隊列里面取隨機(jī)數(shù)。

生產(chǎn)者在放數(shù)據(jù)的時候調(diào)用了 Queue 的 put 方法,消費者在取的時候使用了 get 方法,這樣我們就通過 Queue 實現(xiàn)兩個進(jìn)程的數(shù)據(jù)共享了。

上面的例子有些不好理解,接下來,我們來用個比較簡單的例子:

Python 多進(jìn)程之間是默認(rèn)無法通信的,因為是并行執(zhí)行的。所以需要借助其他數(shù)據(jù)結(jié)構(gòu)。

舉個例子:

你一個進(jìn)程抓取到數(shù)據(jù),要給另一個進(jìn)程用,就需要進(jìn)程通信。

隊列:就像排隊一樣,先進(jìn)先出。也就是你先放進(jìn)去的數(shù)據(jù),也就先取出數(shù)據(jù)。

棧:主要用在 C 和 C++ 上的數(shù)據(jù)結(jié)構(gòu)。主要存儲用戶自定義的數(shù)據(jù)。它是后進(jìn)先出。先進(jìn)去的墊在底層,后進(jìn)的在上面。

from multiprocessing import Process, Queue

# Process :進(jìn)程

# Queue :隊列

# import multiprocessing

def write(q):

# multiprocessing.current_process().name

# multiprocessing.current_process().pid

# multiprocessing.current_process().is_alive()

print("Process to write: {}" .format(Process.pid))

for i in range(10):

print("Put {} to queue...".format(i))

q.put(i) # 把數(shù)字放到我們的隊列里面去

def read(q):

print("Process to read: {}" .format(Process.pid))

while True:

# 這里為什么要使用 while 呢?因為我們要不斷的循環(huán),隊列當(dāng)中有可能沒有數(shù)據(jù),所以需要一直循環(huán)獲取。

# 當(dāng)然,你也可以直接指定循環(huán)的次數(shù)

value = q.get() # 獲取隊列中的數(shù)據(jù)(隊列中沒有數(shù)據(jù)就會阻塞在那里)

print("Get {} from queue." .format(value))

# 所以就有以下策略:一個線程抓取 url 放入隊列之中,另一個隊列解析

if __name__ == '__main__':

# 父進(jìn)程創(chuàng)建 Queue ,并傳給各個子進(jìn)程:

q = Queue() # 隊列

pw = Process(target=write, args=(q, ))

pr = Process(target=read, args=(q, ))

# 啟動子進(jìn)程 pw ,寫入:

pw.start()

# 啟動子進(jìn)程 pr, 讀?。?

pr.start()

# 等待 pw 結(jié)束

pw.join()

# 等待 pr 結(jié)束

pr.join()

舉個實操的小例子:

"""

project = 'Code', file_name = 'duoxianc', author = 'AI悅創(chuàng)'

time = '2020/4/20 11:31', product_name = PyCharm

# code is far away from bugs with the god animal protecting

I love animals. They taste delicious.

"""

import time

from multiprocessing import Process, Queue

import requests

from lxml import etree

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36'}

def spider(q):

html = requests.get('http://www.budejie.com', headers = headers)

# print(html.text)

xml = etree.HTML(html.text)

user_name = xml.xpath('//div[@class="u-txt"]/a/text()')

# print(user_name)

q.put(user_name)

def parse(q):

while True:

time.sleep(0.1)

if not q.empty():

value = q.get()

print(f'value:{value}')

break

if __name__ == '__main__':

q = Queue()

sp = Process(target=spider, args=(q,))

pa = Process(target=parse, args=(q,))

sp.start()

pa.start()

sp.join()

pa.join()

12. 管道

剛才我們使用 Queue 實現(xiàn)了進(jìn)程間的數(shù)據(jù)共享,那么進(jìn)程之間直接通信,如收發(fā)信息,用什么比較好呢?可以用 Pipe,管道。

管道,我們可以把它理解為兩個進(jìn)程之間通信的通道。管道可以是單向的,即 half-duplex:一個進(jìn)程負(fù)責(zé)發(fā)消息,另一個進(jìn)程負(fù)責(zé)收消息;也可以是雙向的 duplex,即互相收發(fā)消息。

默認(rèn)聲明 Pipe 對象是雙向管道,如果要創(chuàng)建單向管道,可以在初始化的時候傳入 deplex 參數(shù)為 False。

我們用一個實例來感受一下:

from multiprocessing import Process, Pipe

class Consumer(Process):

def __init__(self, pipe):

Process.__init__(self)

self.pipe = pipe

def run(self):

self.pipe.send('Consumer Words')

print(f'Consumer Received: {self.pipe.recv()}')

class Producer(Process):

def __init__(self, pipe):

Process.__init__(self)

self.pipe = pipe

def run(self):

print(f'Producer Received: {self.pipe.recv()}')

self.pipe.send('Producer Words')

if __name__ == '__main__':

pipe = Pipe()

p = Producer(pipe[0])

c = Consumer(pipe[1])

p.daemon = c.daemon = True

p.start()

c.start()

p.join()

c.join()

print('Main Process Ended')

在這個例子里我們聲明了一個默認(rèn)為雙向的管道,然后將管道的兩端分別傳給兩個進(jìn)程。兩個進(jìn)程互相收發(fā)。觀察一下結(jié)果:

Producer Received: Consumer Words

Consumer Received: Producer Words

Main Process Ended

管道 Pipe 就像進(jìn)程之間搭建的橋梁,利用它我們就可以很方便地實現(xiàn)進(jìn)程間通信了。

13. 進(jìn)程池

為什么需要進(jìn)程池與線程池呢,我就用前面我們在進(jìn)行上下文切換的時候會有資源消耗,而在這個基礎(chǔ)上,創(chuàng)建線程與刪除線程都是需要消耗更多的資源。而這個池就節(jié)省了資源消耗,這樣我們就不用進(jìn)行創(chuàng)建和銷毀了,只要獲取里面的使用即可。這里我主要講一下進(jìn)程池與線程池的簡單用法。

在講之前,我們先看幾個簡單的例子:

第一種方法(多任務(wù)):

from multiprocessing import Pool

def function_square(data):

result = data*data

return result

if __name__ == '__main__':

inputs = [i for i in range(100)]

# inputs = (i for i in range(100))

# inputs = list(range(100))

pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會根據(jù)你電腦狀態(tài),自行創(chuàng)建。

# 按你的電腦自動創(chuàng)建相應(yīng)的數(shù)目

# map 把任務(wù)交給進(jìn)程池

# pool.map(function, iterable)

pool_outputs = pool.map(function_square, inputs)

# pool_outputs = pool.map(function_square, (2,3, 4, 5))

pool.close()

pool.join()

print("Pool :", pool_outputs)

第二種方法(單任務(wù)):

from multiprocessing import Pool

def function_square(data):

result = data*data

return result

if __name__ == '__main__':

pool = Pool(processes=4) # 如果你不指定數(shù)目的化,它就會根據(jù)你電腦狀態(tài),自行創(chuàng)建。(按你的電腦自動創(chuàng)建相應(yīng)的數(shù)目)

# map 把任務(wù)交給進(jìn)程池

# pool.map(function, iterable)

pool_outputs = pool.apply(function_square, args=(10, ))

pool.close()

pool.join()

print("Pool :", pool_outputs)

使用 from multiprocessing import Pool:引入進(jìn)程池 ,那這個進(jìn)程池,它是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請求提交到進(jìn)程池,如果這個進(jìn)程池還沒有滿的話,就創(chuàng)建新的進(jìn)程來執(zhí)行請求。 如果池滿的話,就會先等待。

# 那么,我們可以首先聲明這個進(jìn)程池;

# 然后,使用 map 方法,那其實這個 map 方法和正常的 map 方法是一致的。

# map:

# pool = Pool()

# pool.map(main, [i*10 for i in range(10)])

# 第一個參數(shù):他會將數(shù)組中的每一個元素拿出來,當(dāng)作函數(shù)的一個個參數(shù),然后創(chuàng)建一個個進(jìn)程,放到進(jìn)程池里面去運行。

# 第二個參數(shù):構(gòu)造一個數(shù)組,然后也就是 0 到 90 的這么一個循環(huán),那我們直接使用 list 構(gòu)造一下

接下來,我們來系統(tǒng)的講解一下,進(jìn)程池。

在前面,我們講了可以使用 Process 來創(chuàng)建進(jìn)程,同時也講了如何用 Semaphore 來控制進(jìn)程的并發(fā)執(zhí)行數(shù)量。

假如現(xiàn)在我們遇到這么一個問題,我有 10000 個任務(wù),每個任務(wù)需要啟動一個進(jìn)程來執(zhí)行,并且一個進(jìn)程運行完畢之后要緊接著啟動下一個進(jìn)程,同時我還需要控制進(jìn)程的并發(fā)數(shù)量,不能并發(fā)太高,不然 CPU 處理不過來(如果同時運行的進(jìn)程能維持在一個最高恒定值當(dāng)然利用率是最高的)。

那么我們該如何來實現(xiàn)這個需求呢?

用 Process 和 Semaphore 可以實現(xiàn),但是實現(xiàn)起來比較我們可以用 Process 和 Semaphore 解決問題,但是實現(xiàn)起來比較煩瑣。而這種需求在平時又是非常常見的。此時,我們就可以派上進(jìn)程池了,即 multiprocessing 中的 Pool。

Pool 可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到 pool 中時,如果池還沒有滿,就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,才會創(chuàng)建新的進(jìn)程來執(zhí)行它。

我們用一個實例來實現(xiàn)一下,代碼如下:

from multiprocessing import Pool

import time

def function(index):

print(f'Start process: {index}')

time.sleep(3)

print(f'End process {index}',)

if __name__ == '__main__':

pool = Pool(processes=3)# 不斷地改變該數(shù)字觀察輸出結(jié)果

for i in range(4):

pool.apply_async(function, args=(i, ))

print('Main Process started')

pool.close()

pool.join()

print("Main Process ended")

在這個例子中我們聲明了一個大小為 3 的進(jìn)程池,通過 processes 參數(shù)來指定,如果不指定,那么會自動根據(jù)處理器內(nèi)核來分配進(jìn)程數(shù)。接著我們使用 apply_async 方法將進(jìn)程添加進(jìn)去,args 可以用來傳遞參數(shù)。

運行結(jié)果如下:

Main Process started

Start process: 0

Start process: 1

Start process: 2

End process 0

End process 1

End process 2

Start process: 3

End process 3

Main Process ended

進(jìn)程池大小為 3,所以最初可以看到有 3 個進(jìn)程同時執(zhí)行,第進(jìn)程池大小為 3,所以最初可以看到有 3 個進(jìn)程同時執(zhí)行,第4個進(jìn)程在等待,在有進(jìn)程運行完畢之后,第4個進(jìn)程馬上跟著運行,出現(xiàn)了如上的運行效果。

最后,我們要記得調(diào)用 close 方法來關(guān)閉進(jìn)程池,使其不再接受新的任務(wù),然后調(diào)用 join 方法讓主進(jìn)程等待子進(jìn)程的退出,等子進(jìn)程運行完畢之后,主進(jìn)程接著運行并結(jié)束。

不過上面的寫法多少有些煩瑣,這里再介紹進(jìn)程池一個更好用的 map 方法,可以將上述寫法簡化很多。

map 方法是怎么用的呢?

第一個參數(shù)就是要啟動的進(jìn)程對應(yīng)的執(zhí)行方法,第 2 個參數(shù)是一個可迭代對象,其中的每個元素會被傳遞給這個執(zhí)行方法。

舉個例子:

現(xiàn)在我們有一個 list,里面包含了很多 URL,另外我們也定義了一個方法用來抓取每個 URL 內(nèi)容并解析,那么我們可以直接在 map 的第一個參數(shù)傳入方法名,第 2 個參數(shù)傳入 URL 數(shù)組。

我們用一個實例來感受一下:

from multiprocessing import Pool

import urllib.request

import urllib.error

def scrape(url):

try:

urllib.request.urlopen(url)

print(f'URL {url} Scraped')

except (urllib.error.HTTPError, urllib.error.URLError):

print(f'URL {url} not Scraped')

if __name__ == '__main__':

pool = Pool(processes=3)

urls = [

'https://www.baidu.com',

'http://www.meituan.com/',

'https://www.aiyc.top/',

'http://xxxyxxx.net',

]

pool.map(scrape, urls)

pool.close()

這個例子中我們先定義了一個 scrape 方法,它接收一個參數(shù) url,這里就是請求了一下這個鏈接,然后輸出爬取成功的信息,如果發(fā)生錯誤,則會輸出爬取失敗的信息。

首先我們要初始化一個 Pool,指定進(jìn)程數(shù)為 3。然后我們聲明一個 urls 列表,接著我們調(diào)用了 map 方法,第 1 個參數(shù)就是進(jìn)程對應(yīng)的執(zhí)行方法,第 2 個參數(shù)就是 urls 列表,map 方法會依次將 urls 的每個元素作為 scrape 的參數(shù)傳遞并啟動一個新的進(jìn)程,加到進(jìn)程池中執(zhí)行。

運行結(jié)果如下:

URL https://www.baidu.com Scraped

URL http://xxxyxxx.net not Scraped

URL https://www.aiyc.top/ Scraped

URL http://www.meituan.com/ Scraped

這樣,我們就可以實現(xiàn) 3 個進(jìn)程并行運行。不同的進(jìn)程相互獨立地輸出了對應(yīng)的爬取結(jié)果。

可以看到,我們利用 Pool 的 map 方法非常方便地實現(xiàn)了多進(jìn)程的執(zhí)行。后面我們也會在實戰(zhàn)案例中結(jié)合進(jìn)程池來實現(xiàn)數(shù)據(jù)的爬取。

以上便是 Python 中多進(jìn)程的基本用法,本節(jié)內(nèi)容比較多,后面的實戰(zhàn)案例也會用到這些內(nèi)容,需要好好掌握。

14. 實戰(zhàn)(貓眼 TOP100 + re + multiprocessing)

# !/usr/bin/python3

# -*- coding: utf-8 -*-

# @Author:AI悅創(chuàng) @DateTime :2020/2/12 15:23 @Function :功能 Development_tool :PyCharm

# code is far away from bugs with the god animal protecting

# I love animals. They taste delicious.

# https://maoyan.com/board/4?offset=0

# https://maoyan.com/board/4?offset=10

# https://maoyan.com/board/4?offset=20

# https://maoyan.com/board/4?offset=30

import requests,re,json

from requests.exceptions import RequestException

from multiprocessing import Pool # 引入進(jìn)程池

headers = {

'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.87 Safari/537.36'

}

session = requests.Session()

session.headers = headers

def get_one_page(url):

try:

response = session.get(url)

if response.status_code == 200:

return response.text

return None

except RequestException:

return None

def parse_one_page(html):

pattern = re.compile('

.*?board-index.*?>(\d+).*?data-src="(.*?)".*?name.*?>

+'.*?>(.*?).*?star">(.*?)

.*?releasetime">(.*?)

.*?integer">'

+'(.*?).*?fraction">(.*?).*?

', re.S)

# 標(biāo)簽的開始和結(jié)尾都要寫出來!!!

items = re.findall(pattern, html)

# 使用 yield 把這個方法變成一個生成器

# 要把返回的結(jié)果做成一個鍵值對的形式

for item in items:

yield {

'index': item[0],

'image': item[1],

'title': item[2],

'actor': item[3].strip()[17:],

'time': item[4][5:],

'score': item[5]+item[6]

}

def write_to_file(content):

# print(type(content))

# with open('result.txt', 'a') as f:

with open('result.txt', 'a', encoding='utf-8') as f:

# 字典轉(zhuǎn)換成字符串

# f.write(json.dumps(content) + '\n') # 中文編碼變成 Unicode

f.write(json.dumps(content, ensure_ascii=False) + '\n')

f.close()

def main(offset):

url = f'https://maoyan.com/board/4?offset={offset}'

html = get_one_page(url)

for item in parse_one_page(html):

print(item)

write_to_file(item)

# 1.0

# if __name__ == '__main__':

# for i in range(10): # range(0, 100, 10)

# main(i*10)

# 2.0

if __name__ == '__main__':

pool = Pool()

pool.map(main, [i*10 for i in range(10)])

# 優(yōu)化,如果你要秒抓的話,使用 from multiprocessing import Pool # 引入進(jìn)程池 ,當(dāng)然我們目的不是秒抓,而是學(xué)習(xí)一下多進(jìn)程的用法

# 那么這個進(jìn)程池,他是可以可以提供指定數(shù)量進(jìn)程池,如果有新的請求提交到進(jìn)程池,如果這個進(jìn)程池還沒有滿的話,就創(chuàng)建新的進(jìn)程來執(zhí)行請求。

# 如果池滿的話,就會先等待

# 那么,我們可以首先聲明這個進(jìn)程池;

# 然后,使用 map 方法,那其實這個 map 方法和正常的 map 方法是一致的。

# map:

# pool = Pool()

# pool.map(main, [i*10 for i in range(10)])

# 第一個參數(shù):他會將數(shù)組中的每一個元素拿出來,當(dāng)作函數(shù)的一個個參數(shù),然后創(chuàng)建一個個進(jìn)程,放到進(jìn)程池里面去運行。

# 第二個參數(shù):構(gòu)造一個數(shù)組,然后也就是 0 到 90 的這么一個循環(huán),那我們直接使用 list 構(gòu)造一下

15. 補(bǔ)充:線程池

我找了許多包,這個包還是不錯的:Pip install threadpool

# project = 'Code', file_name = '線程池', author = 'AI悅創(chuàng)'

# time = '2020/3/3 0:05', product_name = PyCharm

# code is far away from bugs with the god animal protecting

# I love animals. They taste delicious.

import time

import threadpool

# 執(zhí)行比較耗時的函數(shù),需要開多線程

def get_html(url):

time.sleep(3)

print(url)

# 按原本的單線程運行時間為:300s

# 而多線程池的化:30s

# 使用多線程執(zhí)行 telent 函數(shù)

urls = [i for i in range(100)]

pool = threadpool.ThreadPool(10) # 建立線程池

# 提交任務(wù)給線程池

requests = threadpool.makeRequests(get_html, urls)

# 開始執(zhí)行任務(wù)

for req in requests:

pool.putRequest(req)

pool.wait()

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
換一批
延伸閱讀

9月2日消息,不造車的華為或?qū)⒋呱龈蟮莫毥谦F公司,隨著阿維塔和賽力斯的入局,華為引望愈發(fā)顯得引人矚目。

關(guān)鍵字: 阿維塔 塞力斯 華為

加利福尼亞州圣克拉拉縣2024年8月30日 /美通社/ -- 數(shù)字化轉(zhuǎn)型技術(shù)解決方案公司Trianz今天宣布,該公司與Amazon Web Services (AWS)簽訂了...

關(guān)鍵字: AWS AN BSP 數(shù)字化

倫敦2024年8月29日 /美通社/ -- 英國汽車技術(shù)公司SODA.Auto推出其旗艦產(chǎn)品SODA V,這是全球首款涵蓋汽車工程師從創(chuàng)意到認(rèn)證的所有需求的工具,可用于創(chuàng)建軟件定義汽車。 SODA V工具的開發(fā)耗時1.5...

關(guān)鍵字: 汽車 人工智能 智能驅(qū)動 BSP

北京2024年8月28日 /美通社/ -- 越來越多用戶希望企業(yè)業(yè)務(wù)能7×24不間斷運行,同時企業(yè)卻面臨越來越多業(yè)務(wù)中斷的風(fēng)險,如企業(yè)系統(tǒng)復(fù)雜性的增加,頻繁的功能更新和發(fā)布等。如何確保業(yè)務(wù)連續(xù)性,提升韌性,成...

關(guān)鍵字: 亞馬遜 解密 控制平面 BSP

8月30日消息,據(jù)媒體報道,騰訊和網(wǎng)易近期正在縮減他們對日本游戲市場的投資。

關(guān)鍵字: 騰訊 編碼器 CPU

8月28日消息,今天上午,2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會開幕式在貴陽舉行,華為董事、質(zhì)量流程IT總裁陶景文發(fā)表了演講。

關(guān)鍵字: 華為 12nm EDA 半導(dǎo)體

8月28日消息,在2024中國國際大數(shù)據(jù)產(chǎn)業(yè)博覽會上,華為常務(wù)董事、華為云CEO張平安發(fā)表演講稱,數(shù)字世界的話語權(quán)最終是由生態(tài)的繁榮決定的。

關(guān)鍵字: 華為 12nm 手機(jī) 衛(wèi)星通信

要點: 有效應(yīng)對環(huán)境變化,經(jīng)營業(yè)績穩(wěn)中有升 落實提質(zhì)增效舉措,毛利潤率延續(xù)升勢 戰(zhàn)略布局成效顯著,戰(zhàn)新業(yè)務(wù)引領(lǐng)增長 以科技創(chuàng)新為引領(lǐng),提升企業(yè)核心競爭力 堅持高質(zhì)量發(fā)展策略,塑強(qiáng)核心競爭優(yōu)勢...

關(guān)鍵字: 通信 BSP 電信運營商 數(shù)字經(jīng)濟(jì)

北京2024年8月27日 /美通社/ -- 8月21日,由中央廣播電視總臺與中國電影電視技術(shù)學(xué)會聯(lián)合牽頭組建的NVI技術(shù)創(chuàng)新聯(lián)盟在BIRTV2024超高清全產(chǎn)業(yè)鏈發(fā)展研討會上宣布正式成立。 活動現(xiàn)場 NVI技術(shù)創(chuàng)新聯(lián)...

關(guān)鍵字: VI 傳輸協(xié)議 音頻 BSP

北京2024年8月27日 /美通社/ -- 在8月23日舉辦的2024年長三角生態(tài)綠色一體化發(fā)展示范區(qū)聯(lián)合招商會上,軟通動力信息技術(shù)(集團(tuán))股份有限公司(以下簡稱"軟通動力")與長三角投資(上海)有限...

關(guān)鍵字: BSP 信息技術(shù)
關(guān)閉
關(guān)閉