零拷貝(Zero-copy)及其應(yīng)用詳解
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

前言
零拷貝(Zero-copy)是一種高效的數(shù)據(jù)傳輸機(jī)制,在追求低延遲的傳輸場(chǎng)景中十分常用。本文先通過(guò)傳統(tǒng)方案引出零拷貝機(jī)制,然后分析其細(xì)節(jié),最后介紹它的部分應(yīng)用。文中涉及到的操作系統(tǒng)理論知識(shí)都可以參考英文維基或者相關(guān)書(shū)籍,如Abraham Silberschatz著《操作系統(tǒng)概念》、Andrew S. Tanenbaum著《現(xiàn)代操作系統(tǒng)》等。
傳統(tǒng)的數(shù)據(jù)傳輸方法
在互聯(lián)網(wǎng)時(shí)代,從某臺(tái)機(jī)器將一份數(shù)據(jù)(比如一個(gè)文件)通過(guò)網(wǎng)絡(luò)傳輸?shù)搅硗庖慌_(tái)機(jī)器,是再平常不過(guò)的事情了。如果按照一般的思路,用Java語(yǔ)言來(lái)描述發(fā)送端的邏輯,大致如下。
Socket socket = new Socket(HOST, PORT);
InputStream inputStream = new FileInputStream(FILE_PATH);
OutputStream outputStream = new DataOutputStream(socket.getOutputStream());
byte[] buffer = new byte[4096];
while (inputStream.read(buffer) >= 0) {
outputStream.write(buffer);
}
outputStream.close();
socket.close();
inputStream.close();看起來(lái)當(dāng)然是很簡(jiǎn)單的。但是如果我們深入到操作系統(tǒng)的層面,就會(huì)發(fā)現(xiàn)實(shí)際的微觀操作要更復(fù)雜,具體來(lái)說(shuō)有以下步驟:
JVM向OS發(fā)出read()系統(tǒng)調(diào)用,觸發(fā)上下文切換,從用戶態(tài)切換到內(nèi)核態(tài)。
從外部存儲(chǔ)(如硬盤(pán))讀取文件內(nèi)容,通過(guò)直接內(nèi)存訪問(wèn)(DMA)存入內(nèi)核地址空間的緩沖區(qū)。
將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間緩沖區(qū),read()系統(tǒng)調(diào)用返回,并從內(nèi)核態(tài)切換回用戶態(tài)。
JVM向OS發(fā)出write()系統(tǒng)調(diào)用,觸發(fā)上下文切換,從用戶態(tài)切換到內(nèi)核態(tài)。
將數(shù)據(jù)從用戶緩沖區(qū)拷貝到內(nèi)核中與目的地Socket關(guān)聯(lián)的緩沖區(qū)。
數(shù)據(jù)最終經(jīng)由Socket通過(guò)DMA傳送到硬件(如網(wǎng)卡)緩沖區(qū),write()系統(tǒng)調(diào)用返回,并從內(nèi)核態(tài)切換回用戶態(tài)。
如果語(yǔ)言描述看起來(lái)有些亂的話,通過(guò)時(shí)序圖描述會(huì)更清楚一些。
傳統(tǒng)方法的時(shí)序圖
到了這一步,你是否覺(jué)得簡(jiǎn)單的代碼邏輯下隱藏著很累贅的東西了?事實(shí)也確實(shí)如此,這個(gè)過(guò)程一共發(fā)生了4次上下文切換(嚴(yán)格來(lái)講是模式切換),并且數(shù)據(jù)也被來(lái)回拷貝了4次。如果忽略掉系統(tǒng)調(diào)用的細(xì)節(jié),整個(gè)過(guò)程可以用下面的兩張簡(jiǎn)圖表示。
傳統(tǒng)方法的流程框圖
傳統(tǒng)方法的上下文切換過(guò)程
我們都知道,上下文切換是CPU密集型的工作,數(shù)據(jù)拷貝是I/O密集型的工作。如果一次簡(jiǎn)單的傳輸就要像上面這樣復(fù)雜的話,效率是相當(dāng)?shù)拖碌?。零拷貝機(jī)制的終極目標(biāo),就是消除冗余的上下文切換和數(shù)據(jù)拷貝,提高效率。
零拷貝的數(shù)據(jù)傳輸方法
“基礎(chǔ)的”零拷貝機(jī)制
通過(guò)上面的分析可以看出,第2、3次拷貝(也就是從內(nèi)核空間到用戶空間的來(lái)回復(fù)制)是沒(méi)有意義的,數(shù)據(jù)應(yīng)該可以直接從內(nèi)核緩沖區(qū)直接送入Socket緩沖區(qū)。零拷貝機(jī)制就實(shí)現(xiàn)了這一點(diǎn)。不過(guò)零拷貝需要由操作系統(tǒng)直接支持,不同OS有不同的實(shí)現(xiàn)方法。大多數(shù)Unix-like系統(tǒng)都是提供了一個(gè)名為sendfile()的系統(tǒng)調(diào)用,在其man page中,就有這樣的描述:
sendfile() copies data between one file descriptor and another.
Because this copying is done within the kernel, sendfile() is more efficient than the combination of read(2) and write(2), which would require transferring data to and from user space.下面是零拷貝機(jī)制下,數(shù)據(jù)傳輸?shù)臅r(shí)序圖。
零拷貝方法的時(shí)序圖
可見(jiàn)確實(shí)是消除了從內(nèi)核空間到用戶空間的來(lái)回復(fù)制,因此“zero-copy”這個(gè)詞實(shí)際上是站在內(nèi)核的角度來(lái)說(shuō)的,并不是完全不會(huì)發(fā)生任何拷貝。
在Java NIO包中提供了零拷貝機(jī)制對(duì)應(yīng)的API,即FileChannel.transferTo()方法。不過(guò)FileChannel類(lèi)是抽象類(lèi),transferTo()也是一個(gè)抽象方法,因此還要依賴(lài)于具體實(shí)現(xiàn)。FileChannel的實(shí)現(xiàn)類(lèi)并不在JDK本身,而位于sun.nio.ch.FileChannelImpl類(lèi)中,零拷貝的具體實(shí)現(xiàn)自然也都是native方法,看官如有興趣可以自行查找源碼來(lái)看,這里不再贅述。
將傳統(tǒng)方式的發(fā)送端邏輯改寫(xiě)一下,大致如下。
SocketAddress socketAddress = new InetSocketAddress(HOST, PORT);
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(socketAddress);
File file = new File(FILE_PATH);
FileChannel fileChannel = new FileInputStream(file).getChannel();
fileChannel.transferTo(0, file.length(), socketChannel);
fileChannel.close();
socketChannel.close();借助transferTo()方法的話,整個(gè)過(guò)程就可以用下面的簡(jiǎn)圖表示了。
零拷貝方法的流程框圖
零拷貝方法的上下文切換過(guò)程
可見(jiàn),不僅拷貝的次數(shù)變成了3次,上下文切換的次數(shù)也減少到了2次,效率比傳統(tǒng)方式高了很多。但是它還并非完美狀態(tài),下面看一看讓它變得更優(yōu)化的方法。
對(duì)Scatter/Gather的支持
在“基礎(chǔ)”零拷貝方式的時(shí)序圖中,有一個(gè)“write data to target socket buffer”的回環(huán),在框圖中也有一個(gè)從“Read buffer”到“Socket buffer”的大箭頭。這是因?yàn)樵谝话愕腂lock DMA方式中,源物理地址和目標(biāo)物理地址都得是連續(xù)的,所以一次只能傳輸物理上連續(xù)的一塊數(shù)據(jù),每傳輸一個(gè)塊發(fā)起一次中斷,直到傳輸完成,所以必須要在兩個(gè)緩沖區(qū)之間拷貝數(shù)據(jù)。
而Scatter/Gather DMA方式則不同,會(huì)預(yù)先維護(hù)一個(gè)物理上不連續(xù)的塊描述符的鏈表,描述符中包含有數(shù)據(jù)的起始地址和長(zhǎng)度。傳輸時(shí)只需要遍歷鏈表,按序傳輸數(shù)據(jù),全部完成后發(fā)起一次中斷即可,效率比Block DMA要高。也就是說(shuō),硬件可以通過(guò)Scatter/Gather DMA直接從內(nèi)核緩沖區(qū)中取得全部數(shù)據(jù),不需要再?gòu)膬?nèi)核緩沖區(qū)向Socket緩沖區(qū)拷貝數(shù)據(jù)。因此上面的時(shí)序圖還可以進(jìn)一步簡(jiǎn)化。
支持Scatter/Gather的零拷貝時(shí)序圖
這就是完全體的零拷貝機(jī)制了,是不是清爽了很多?相對(duì)地,它的流程框圖如下。
支持Scatter/Gather的零拷貝流程框圖
對(duì)內(nèi)存映射(mmap)的支持
上面講的機(jī)制看起來(lái)一切都很好,但它還是有個(gè)缺點(diǎn):如果我想在傳輸時(shí)修改數(shù)據(jù)本身,就無(wú)能為力了。不過(guò),很多操作系統(tǒng)也提供了內(nèi)存映射機(jī)制,對(duì)應(yīng)的系統(tǒng)調(diào)用為mmap()/munmap()。通過(guò)它可以將文件數(shù)據(jù)映射到內(nèi)核地址空間,直接進(jìn)行操作,操作完之后再刷回去。其對(duì)應(yīng)的簡(jiǎn)要時(shí)序圖如下。
支持mmap的零拷貝時(shí)序圖
當(dāng)然,天下沒(méi)有免費(fèi)的午餐,上面的過(guò)程仍然會(huì)發(fā)生4次上下文切換。另外,它需要在快表(TLB)中始終維護(hù)著所有數(shù)據(jù)對(duì)應(yīng)的地址空間,直到刷寫(xiě)完成,因此處理缺頁(yè)的overhead也會(huì)更大。在使用該機(jī)制時(shí),需要權(quán)衡效率。
NIO框架中提供了MappedByteBuffer用來(lái)支持mmap。它與常用的DirectByteBuffer一樣,都是在堆外內(nèi)存分配空間。相對(duì)地,HeapByteBuffer在堆內(nèi)內(nèi)存分配空間。
零拷貝機(jī)制的應(yīng)用
零拷貝在很多框架中得到了廣泛應(yīng)用,一般都以Netty為例來(lái)分析。但作為大數(shù)據(jù)工程師,我就以Kafka與Spark為例來(lái)簡(jiǎn)單說(shuō)兩句吧。
在Kafka中的應(yīng)用
在使用Kafka時(shí),我們經(jīng)常會(huì)想,為什么Kafka能夠達(dá)到如此巨大的數(shù)據(jù)吞吐量?這與Kafka的很多設(shè)計(jì)哲學(xué)是分不開(kāi)的,比如分區(qū)并行、ISR機(jī)制、順序?qū)懭?、?yè)緩存、高效序列化等等,零拷貝當(dāng)然也是其中之一。由于Kafka的消息存儲(chǔ)涉及到海量數(shù)據(jù)讀寫(xiě),所以利用零拷貝能夠顯著地降低延遲,提高效率。
在Kafka中,底層傳輸動(dòng)作由TransportLayer接口來(lái)定義。它對(duì)SocketChannel進(jìn)行了簡(jiǎn)單的封裝,其中transferFrom()方法定義如下。(Kafka版本為0.10.2.2)
/**
* Transfers bytes from `fileChannel` to this `TransportLayer`.
*
* This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
* but it will unwrap the destination channel, if possible, in order to benefit from zero copy. This is required
* because the fast path of `transferTo` is only executed if the destination buffer inherits from an internal JDK
* class.
*
* @param fileChannel The source channel
* @param position The position within the file at which the transfer is to begin; must be non-negative
* @param count The maximum number of bytes to be transferred; must be non-negative
* @return The number of bytes, possibly zero, that were actually transferred
* @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
*/
long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;該方法的功能是將FileChannel中的數(shù)據(jù)傳輸?shù)絋ransportLayer,也就是SocketChannel。在實(shí)現(xiàn)類(lèi)PlaintextTransportLayer的對(duì)應(yīng)方法中,就是直接調(diào)用了FileChannel.transferTo()方法。
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}對(duì)該方法的調(diào)用則位于FileRecords.writeTo()方法中,用于將Kafka收到的緩存數(shù)據(jù)零拷貝地寫(xiě)入目的Channel。
@Override
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
int count = Math.min(length, oldSize);
final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;
bytesTransferred = tl.transferFrom(channel, position, count);
} else {
bytesTransferred = channel.transferTo(position, count, destChannel);
}
return bytesTransferred;
}在Spark中的應(yīng)用 Spark雖然是一個(gè)高效的積極使用內(nèi)存的計(jì)算框架,但在需要使用磁盤(pán)時(shí)也會(huì)適當(dāng)?shù)匾鐚?xiě)。零拷貝機(jī)制在Spark Core中主要就被用來(lái)優(yōu)化Shuffle過(guò)程中的溢寫(xiě)邏輯。由于Shuffle過(guò)程涉及大量的數(shù)據(jù)交換,因此效率當(dāng)然是越高越好。
在啟用Bypass機(jī)制的Sort Shuffle以及Tungsten Sort Shuffle的shuffle write階段,都使用了零拷貝來(lái)快速合并溢寫(xiě)文件的分片,有一個(gè)專(zhuān)門(mén)的配置項(xiàng)spark.file.transferTo來(lái)控制是否啟用零拷貝(默認(rèn)當(dāng)然是true)。以BypassMergeSortShuffleWriter為例,它最終是調(diào)用了通用工具類(lèi)Utils中的copyFileStreamNIO()方法。
def copyFileStreamNIO(
input: FileChannel,
output: FileChannel,
startPosition: Long,
bytesToCopy: Long): Unit = {
val initialPos = output.position()
var count = 0L
// In case transferTo method transferred less data than we have required.
while (count < bytesToCopy) {
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
}
assert(count == bytesToCopy,
s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")
val finalPos = output.position()
val expectedPos = initialPos + bytesToCopy
assert(finalPos == expectedPos,
s"""
|Current position $finalPos do not equal to expected position $expectedPos
|after transferTo, please check your kernel version to see if it is 2.6.32,
|this is a kernel bug which will lead to unexpected behavior when using transferTo.
|You can set spark.file.transferTo = false to disable this NIO feature.
""".stripMargin)
}可見(jiàn),該方法用于將數(shù)據(jù)從一個(gè)FileChannel零拷貝到另一個(gè)FileChannel。通過(guò)控制起始位置和長(zhǎng)度參數(shù),就可以精確地將所有溢寫(xiě)文件拼合在一起了。

版權(quán)聲明:
文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??




