深入探秘 Netty、Kafka 中的零拷貝技術(shù)!
掃描二維碼
隨時(shí)隨地手機(jī)看文章
作者:ksfzhaohui
原文:juejin.im/post/5cad6f1ef265da039f0ef5df
零拷貝,從字面意思理解就是數(shù)據(jù)不需要來(lái)回的拷貝,大大提升了系統(tǒng)的性能。我們也經(jīng)常在 Java NIO,Netty,Kafka,RocketMQ 等框架中聽(tīng)到零拷貝,它經(jīng)常作為其提升性能的一大亮點(diǎn)
下面從 I/O 的幾個(gè)概念開(kāi)始,進(jìn)而再分析零拷貝。
I/O 概念
緩沖區(qū)
緩沖區(qū)是所有 I/O 的基礎(chǔ),I/O 講的無(wú)非就是把數(shù)據(jù)移進(jìn)或移出緩沖區(qū);進(jìn)程執(zhí)行 I/O 操作,就是向操作系統(tǒng)發(fā)出請(qǐng)求,讓它要么把緩沖區(qū)的數(shù)據(jù)排干(寫(xiě)),要么填充緩沖區(qū)(讀)。
下面看一個(gè) Java 進(jìn)程發(fā)起 Read 請(qǐng)求加載數(shù)據(jù)大致的流程圖:
進(jìn)程發(fā)起 Read 請(qǐng)求之后,內(nèi)核接收到 Read 請(qǐng)求之后,會(huì)先檢查內(nèi)核空間中是否已經(jīng)存在進(jìn)程所需要的數(shù)據(jù),如果已經(jīng)存在,則直接把數(shù)據(jù) Copy 給進(jìn)程的緩沖區(qū)。
如果沒(méi)有內(nèi)核隨即向磁盤(pán)控制器發(fā)出命令,要求從磁盤(pán)讀取數(shù)據(jù),磁盤(pán)控制器把數(shù)據(jù)直接寫(xiě)入內(nèi)核 Read 緩沖區(qū),這一步通過(guò) DMA 完成。
接下來(lái)就是內(nèi)核將數(shù)據(jù) Copy 到進(jìn)程的緩沖區(qū);如果進(jìn)程發(fā)起 Write 請(qǐng)求,同樣需要把用戶(hù)緩沖區(qū)里面的數(shù)據(jù) Copy 到內(nèi)核的 Socket 緩沖區(qū)里面,然后再通過(guò) DMA 把數(shù)據(jù) Copy 到網(wǎng)卡中,發(fā)送出去。
你可能覺(jué)得這樣挺浪費(fèi)空間的,每次都需要把內(nèi)核空間的數(shù)據(jù)拷貝到用戶(hù)空間中,所以零拷貝的出現(xiàn)就是為了解決這種問(wèn)題的。
關(guān)于零拷貝提供了兩種方式分別是:
mmap+write
Sendfile
虛擬內(nèi)存
所有現(xiàn)代操作系統(tǒng)都使用虛擬內(nèi)存,使用虛擬的地址取代物理地址,這樣做的好處是:
一個(gè)以上的虛擬地址可以指向同一個(gè)物理內(nèi)存地址。
虛擬內(nèi)存空間可大于實(shí)際可用的物理地址。
利用第一條特性可以把內(nèi)核空間地址和用戶(hù)空間的虛擬地址映射到同一個(gè)物理地址,這樣 DMA 就可以填充對(duì)內(nèi)核和用戶(hù)空間進(jìn)程同時(shí)可見(jiàn)的緩沖區(qū)了。
大致如下圖所示:
省去了內(nèi)核與用戶(hù)空間的往來(lái)拷貝,Java 也利用操作系統(tǒng)的此特性來(lái)提升性能,下面重點(diǎn)看看 Java 對(duì)零拷貝都有哪些支持。
mmap+write 方式
使用 mmap+write 方式代替原來(lái)的 read+write 方式,mmap 是一種內(nèi)存映射文件的方法,即將一個(gè)文件或者其他對(duì)象映射到進(jìn)程的地址空間,實(shí)現(xiàn)文件磁盤(pán)地址和進(jìn)程虛擬地址空間中一段虛擬地址的一一對(duì)應(yīng)關(guān)系。
這樣就可以省掉原來(lái)內(nèi)核 Read 緩沖區(qū) Copy 數(shù)據(jù)到用戶(hù)緩沖區(qū),但是還是需要內(nèi)核 Read 緩沖區(qū)將數(shù)據(jù) Copy 到內(nèi)核 Socket 緩沖區(qū)。
大致如下圖所示:
Sendfile?方式
Sendfile 系統(tǒng)調(diào)用在內(nèi)核版本 2.1 中被引入,目的是簡(jiǎn)化通過(guò)網(wǎng)絡(luò)在兩個(gè)通道之間進(jìn)行的數(shù)據(jù)傳輸過(guò)程。
Sendfile 系統(tǒng)調(diào)用的引入,不僅減少了數(shù)據(jù)復(fù)制,還減少了上下文切換的次數(shù),大致如下圖所示:
數(shù)據(jù)傳送只發(fā)生在內(nèi)核空間,所以減少了一次上下文切換;但是還是存在一次 Copy,能不能把這一次 Copy 也省略掉?
Linux2.4 內(nèi)核中做了改進(jìn),將 Kernel buffer 中對(duì)應(yīng)的數(shù)據(jù)描述信息(內(nèi)存地址,偏移量)記錄到相應(yīng)的 Socket 緩沖區(qū)當(dāng)中,這樣連內(nèi)核空間中的一次 CPU Copy 也省掉了。
Java 零拷貝
MappedByteBuffer
Java NIO 提供的 FileChannel 提供了 map() 方法,該方法可以在一個(gè)打開(kāi)的文件和 MappedByteBuffer 之間建立一個(gè)虛擬內(nèi)存映射。
MappedByteBuffer 繼承于 ByteBuffer,類(lèi)似于一個(gè)基于內(nèi)存的緩沖區(qū),只不過(guò)該對(duì)象的數(shù)據(jù)元素存儲(chǔ)在磁盤(pán)的一個(gè)文件中。
調(diào)用 get() 方法會(huì)從磁盤(pán)中獲取數(shù)據(jù),此數(shù)據(jù)反映該文件當(dāng)前的內(nèi)容,調(diào)用 put() 方法會(huì)更新磁盤(pán)上的文件,并且對(duì)文件做的修改對(duì)其他閱讀者也是可見(jiàn)的。
下面看一個(gè)簡(jiǎn)單的讀取實(shí)例,然后再對(duì) MappedByteBuffer 進(jìn)行分析:
public?class?MappedByteBufferTest?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????File?file?=?new?File("D://db.txt");
????????long?len?=?file.length();
????????byte[]?ds?=?new?byte[(int)?len];
????????MappedByteBuffer?mappedByteBuffer?=?new?FileInputStream(file).getChannel().map(FileChannel.MapMode.READ_ONLY,?0,
????????????????len);
????????for?(int?offset?=?0;?offset?????????????byte?b?=?mappedByteBuffer.get();
????????????ds[offset]?=?b;
????????}
????????Scanner?scan?=?new?Scanner(new?ByteArrayInputStream(ds)).useDelimiter("?");
????????while?(scan.hasNext())?{
????????????System.out.print(scan.next()?+?"?");
????????}
????}
}
????public?abstract?MappedByteBuffer?map(MapMode?mode,
?????????????????????????????????????????long?position,?long?size)
????????throws?IOException;
分別提供了三個(gè)參數(shù),MapMode,Position 和 Size,分別表示:
MapMode:映射的模式,可選項(xiàng)包括:READ_ONLY,READ_WRITE,PRIVATE。
Position:從哪個(gè)位置開(kāi)始映射,字節(jié)數(shù)的位置。
Size:從 Position 開(kāi)始向后多少個(gè)字節(jié)。
重點(diǎn)看一下 MapMode,前兩個(gè)分別表示只讀和可讀可寫(xiě),當(dāng)然請(qǐng)求的映射模式受到 Filechannel 對(duì)象的訪(fǎng)問(wèn)權(quán)限限制,如果在一個(gè)沒(méi)有讀權(quán)限的文件上啟用 READ_ONLY,將拋出 NonReadableChannelException。
PRIVATE 模式表示寫(xiě)時(shí)拷貝的映射,意味著通過(guò) put() 方法所做的任何修改都會(huì)導(dǎo)致產(chǎn)生一個(gè)私有的數(shù)據(jù)拷貝并且該拷貝中的數(shù)據(jù)只有 MappedByteBuffer 實(shí)例可以看到。
該過(guò)程不會(huì)對(duì)底層文件做任何修改,而且一旦緩沖區(qū)被施以垃圾收集動(dòng)作(garbage collected),那些修改都會(huì)丟失。
大致瀏覽一下 map() 方法的源碼:
????public?MappedByteBuffer?map(MapMode?mode,?long?position,?long?size)
????????throws?IOException
????{
????????????...省略...
????????????int?pagePosition?=?(int)(position?%?allocationGranularity);
????????????long?mapPosition?=?position?-?pagePosition;
????????????long?mapSize?=?size?+?pagePosition;
????????????try?{
????????????????//?If?no?exception?was?thrown?from?map0,?the?address?is?valid
????????????????addr?=?map0(imode,?mapPosition,?mapSize);
????????????}?catch?(OutOfMemoryError?x)?{
????????????????//?An?OutOfMemoryError?may?indicate?that?we've?exhausted?memory
????????????????//?so?force?gc?and?re-attempt?map
????????????????System.gc();
????????????????try?{
????????????????????Thread.sleep(100);
????????????????}?catch?(InterruptedException?y)?{
????????????????????Thread.currentThread().interrupt();
????????????????}
????????????????try?{
????????????????????addr?=?map0(imode,?mapPosition,?mapSize);
????????????????}?catch?(OutOfMemoryError?y)?{
????????????????????//?After?a?second?OOME,?fail
????????????????????throw?new?IOException("Map?failed",?y);
????????????????}
????????????}
????????????//?On?Windows,?and?potentially?other?platforms,?we?need?an?open
????????????//?file?descriptor?for?some?mapping?operations.
????????????FileDescriptor?mfd;
????????????try?{
????????????????mfd?=?nd.duplicateForMapping(fd);
????????????}?catch?(IOException?ioe)?{
????????????????unmap0(addr,?mapSize);
????????????????throw?ioe;
????????????}
????????????assert?(IOStatus.checkAll(addr));
????????????assert?(addr?%?allocationGranularity?==?0);
????????????int?isize?=?(int)size;
????????????Unmapper?um?=?new?Unmapper(addr,?mapSize,?isize,?mfd);
????????????if?((!writable)?||?(imode?==?MAP_RO))?{
????????????????return?Util.newMappedByteBufferR(isize,
?????????????????????????????????????????????????addr?+?pagePosition,
?????????????????????????????????????????????????mfd,
?????????????????????????????????????????????????um);
????????????}?else?{
????????????????return?Util.newMappedByteBuffer(isize,
????????????????????????????????????????????????addr?+?pagePosition,
????????????????????????????????????????????????mfd,
????????????????????????????????????????????????um);
????????????}
?????}
大致意思就是通過(guò) Native 方法獲取內(nèi)存映射的地址,如果失敗,手動(dòng) GC 再次映射。
最后通過(guò)內(nèi)存映射的地址實(shí)例化出 MappedByteBuffer,MappedByteBuffer 本身是一個(gè)抽象類(lèi),其實(shí)這里真正實(shí)例化出來(lái)的是 DirectByteBuffer。
DirectByteBuffer
DirectByteBuffer 繼承于 MappedByteBuffer,從名字就可以猜測(cè)出開(kāi)辟了一段直接的內(nèi)存,并不會(huì)占用 JVM 的內(nèi)存空間。
上一節(jié)中通過(guò) Filechannel 映射出的 MappedByteBuffer 其實(shí)際也是 DirectByteBuffer,當(dāng)然除了這種方式,也可以手動(dòng)開(kāi)辟一段空間:
ByteBuffer?directByteBuffer?=?ByteBuffer.allocateDirect(100);
如上開(kāi)辟了 100 字節(jié)的直接內(nèi)存空間。
Channel-to-Channel 傳輸
經(jīng)常需要從一個(gè)位置將文件傳輸?shù)搅硗庖粋€(gè)位置,F(xiàn)ileChannel 提供了 transferTo() 方法用來(lái)提高傳輸?shù)男?,首先看一個(gè)簡(jiǎn)單的實(shí)例:
public?class?ChannelTransfer?{
????public?static?void?main(String[]?argv)?throws?Exception?{
????????String?files[]=new?String[1];
????????files[0]="D://db.txt";
????????catFiles(Channels.newChannel(System.out),?files);
????}
????private?static?void?catFiles(WritableByteChannel?target,?String[]?files)
????????????throws?Exception?{
????????for?(int?i?=?0;?i?????????????FileInputStream?fis?=?new?FileInputStream(files[i]);
????????????FileChannel?channel?=?fis.getChannel();
????????????channel.transferTo(0,?channel.size(),?target);
????????????channel.close();
????????????fis.close();
????????}
????}
}
通過(guò) FileChannel 的 transferTo() 方法將文件數(shù)據(jù)傳輸?shù)?System.out 通道,接口定義如下:
????public?abstract?long?transferTo(long?position,?long?count,
????????????????????????????????????WritableByteChannel?target)
????????throws?IOException;
幾個(gè)參數(shù)也比較好理解,分別是開(kāi)始傳輸?shù)奈恢?,傳輸?shù)淖止?jié)數(shù),以及目標(biāo)通道;transferTo() 允許將一個(gè)通道交叉連接到另一個(gè)通道,而不需要一個(gè)中間緩沖區(qū)來(lái)傳遞數(shù)據(jù)。
注:這里不需要中間緩沖區(qū)有兩層意思:第一層不需要用戶(hù)空間緩沖區(qū)來(lái)拷貝內(nèi)核緩沖區(qū),另外一層兩個(gè)通道都有自己的內(nèi)核緩沖區(qū),兩個(gè)內(nèi)核緩沖區(qū)也可以做到無(wú)需拷貝數(shù)據(jù)。
Netty 零拷貝
Netty 提供了零拷貝的 Buffer,在傳輸數(shù)據(jù)時(shí),最終處理的數(shù)據(jù)會(huì)需要對(duì)單個(gè)傳輸?shù)膱?bào)文,進(jìn)行組合和拆分,NIO 原生的 ByteBuffer 無(wú)法做到
Netty 通過(guò)提供的 Composite(組合)和 Slice(拆分)兩種 Buffer 來(lái)實(shí)現(xiàn)零拷貝。
看下面一張圖會(huì)比較清晰:
TCP 層 HTTP 報(bào)文被分成了兩個(gè) ChannelBuffer,這兩個(gè) Buffer 對(duì)我們上層的邏輯(HTTP 處理)是沒(méi)有意義的。
但是兩個(gè) ChannelBuffer 被組合起來(lái),就成為了一個(gè)有意義的 HTTP 報(bào)文,這個(gè)報(bào)文對(duì)應(yīng)的 ChannelBuffer,才是能稱(chēng)之為“Message”的東西,這里用到了一個(gè)詞“Virtual Buffer”。
可以看一下 Netty 提供的 CompositeChannelBuffer 源碼:
public?class?CompositeChannelBuffer?extends?AbstractChannelBuffer?{
????private?final?ByteOrder?order;
????private?ChannelBuffer[]?components;
????private?int[]?indices;
????private?int?lastAccessedComponentId;
????private?final?boolean?gathering;
????public?byte?getByte(int?index)?{
????????int?componentId?=?componentId(index);
????????return?components[componentId].getByte(index?-?indices[componentId]);
????}
????...省略...
Components 用來(lái)保存的就是所有接收到的 Buffer,Indices 記錄每個(gè) buffer 的起始位置,lastAccessedComponentId 記錄上一次訪(fǎng)問(wèn)的 ComponentId。
CompositeChannelBuffer 并不會(huì)開(kāi)辟新的內(nèi)存并直接復(fù)制所有 ChannelBuffer 內(nèi)容,而是直接保存了所有 ChannelBuffer 的引用,并在子 ChannelBuffer 里進(jìn)行讀寫(xiě),實(shí)現(xiàn)了零拷貝。
其他零拷貝
RocketMQ 的消息采用順序?qū)懙?commitlog 文件,然后利用 consume queue 文件作為索引。
RocketMQ 采用零拷貝 mmap+write 的方式來(lái)回應(yīng) Consumer 的請(qǐng)求。
同樣 Kafka 中存在大量的網(wǎng)絡(luò)數(shù)據(jù)持久化到磁盤(pán)和磁盤(pán)文件通過(guò)網(wǎng)絡(luò)發(fā)送的過(guò)程,Kafka使用了 Sendfile 零拷貝方式。
總結(jié)
零拷貝如果簡(jiǎn)單用 Java 里面對(duì)象的概率來(lái)理解的話(huà),其實(shí)就是使用的都是對(duì)象的引用,每個(gè)引用對(duì)象的地方對(duì)其改變就都能改變此對(duì)象,永遠(yuǎn)只存在一份對(duì)象。
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒(méi)關(guān)注的小伙伴,可以長(zhǎng)按關(guān)注一下:
長(zhǎng)按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場(chǎng),如有問(wèn)題,請(qǐng)聯(lián)系我們,謝謝!