Netty分布式解碼器讀取數(shù)據(jù)不完整的邏輯剖析
概述
在我們上一個(gè)章節(jié)遺留過(guò)一個(gè)問(wèn)題, 就是如果Server在讀取客戶端的數(shù)據(jù)的時(shí)候, 如果一次讀取不完整, 就觸發(fā)channelRead事件, 那么Netty是如何處理這類問(wèn)題的, 在這一章中, 會(huì)對(duì)此做詳細(xì)剖析
之前的章節(jié)我們學(xué)習(xí)過(guò)pipeline, 事件在pipeline中傳遞, handler可以將事件截取并對(duì)其處理, 而之后剖析的編解碼器, 其實(shí)就是一個(gè)handler, 截取byteBuf中的字節(jié), 然后組建成業(yè)務(wù)需要的數(shù)據(jù)進(jìn)行繼續(xù)傳播
編碼器, 通常是OutBoundHandler, 也就是以自身為基準(zhǔn), 對(duì)那些對(duì)外流出的數(shù)據(jù)做處理, 所以也叫編碼器, 將數(shù)據(jù)經(jīng)過(guò)編碼發(fā)送出去
解碼器, 通常是inboundHandler, 也就是以自身為基準(zhǔn), 對(duì)那些流向自身的數(shù)據(jù)做處理, 所以也叫解碼器, 將對(duì)向的數(shù)據(jù)接收之后經(jīng)過(guò)解碼再進(jìn)行使用
同樣, 在netty的編碼器中, 也會(huì)對(duì)半包和粘包問(wèn)題做相應(yīng)的處理
什么是半包, 顧名思義, 就是不完整的數(shù)據(jù)包, 因?yàn)閚etty在輪詢讀事件的時(shí)候, 每次將channel中讀取的數(shù)據(jù), 不一定是一個(gè)完整的數(shù)據(jù)包, 這種情況, 就叫半包
粘包同樣也不難理解, 如果client往server發(fā)送數(shù)據(jù)包, 如果發(fā)送頻繁很有可能會(huì)將多個(gè)數(shù)據(jù)包的數(shù)據(jù)都發(fā)送到通道中, 如果在server在讀取的時(shí)候可能會(huì)讀取到超過(guò)一個(gè)完整數(shù)據(jù)包的長(zhǎng)度, 這種情況叫粘包
有關(guān)半包和粘包, 入下圖所示:

6-0-1
netty對(duì)半包的或者粘包的處理其實(shí)也很簡(jiǎn)單, 通過(guò)之前的學(xué)習(xí), 我們知道, 每個(gè)handler是和channel唯一綁定的, 一個(gè)handler只對(duì)應(yīng)一個(gè)channel, 所以將channel中的數(shù)據(jù)讀取時(shí)候經(jīng)過(guò)解析, 如果不是一個(gè)完整的數(shù)據(jù)包, 則解析失敗, 將這塊數(shù)據(jù)包進(jìn)行保存, 等下次解析時(shí)再和這個(gè)數(shù)據(jù)包進(jìn)行組裝解析, 直到解析到完整的數(shù)據(jù)包, 才會(huì)將數(shù)據(jù)包進(jìn)行向下傳遞
具體流程是在代碼中如何體現(xiàn)的呢?我們進(jìn)入到源碼分析中
第一節(jié): ByteToMessageDecoder
ByteToMessageDecoder解碼器, 顧名思義, 是一個(gè)將Byte解析成消息的解碼器,
我們看他的定義
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
//類體省略
}這里繼承了ChannelInboundHandlerAdapter, 根據(jù)之前的學(xué)習(xí), 我們知道, 這是個(gè)inbound類型的handler, 也就是處理流向自身事件的handler
其次, 該類通過(guò)abstract關(guān)鍵字修飾, 說(shuō)明是個(gè)抽象類, 在我們實(shí)際使用的時(shí)候, 并不是直接使用這個(gè)類, 而是使用其子類, 類定義了解碼器的骨架方法, 具體實(shí)現(xiàn)邏輯交給子類, 同樣, 在半包處理中也是由該類進(jìn)行實(shí)現(xiàn)的
netty中很多解碼器都實(shí)現(xiàn)了這個(gè)類, 并且, 我們也可以通過(guò)實(shí)現(xiàn)該類進(jìn)行自定義解碼器
我們重點(diǎn)關(guān)注一下該類的一個(gè)屬性:
ByteBuf cumulation;
這個(gè)屬性, 就是有關(guān)半包處理的關(guān)鍵屬性, 從概述中我們知道, netty會(huì)將不完整的數(shù)據(jù)包進(jìn)行保存, 這個(gè)數(shù)據(jù)包就是保存在這個(gè)屬性中
之前的學(xué)習(xí)我們知道, ByteBuf讀取完數(shù)據(jù)會(huì)傳遞channelRead事件, 傳播過(guò)程中會(huì)調(diào)用handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關(guān)鍵部分
我們看其channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果message是byteBuf類型
if (msg instanceof ByteBuf) {
//簡(jiǎn)單當(dāng)成一個(gè)arrayList, 用于盛放解析到的對(duì)象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//當(dāng)前累加器為空, 說(shuō)明這是第一次從io流里面讀取數(shù)據(jù)
first = cumulation == null;
if (first) {
//如果是第一次, 則將累加器賦值為剛讀進(jìn)來(lái)的對(duì)象
cumulation = data;
} else {
//如果不是第一次, 則把當(dāng)前累加的數(shù)據(jù)和讀進(jìn)來(lái)的數(shù)據(jù)進(jìn)行累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//調(diào)用子類的方法進(jìn)行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長(zhǎng)度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
//不是byteBuf類型則向下傳播
ctx.fireChannelRead(msg);
}
}這方法比較長(zhǎng), 帶大家一步步剖析
首先判斷如果傳來(lái)的數(shù)據(jù)是ByteBuf, 則進(jìn)入if塊中
CodecOutputList out = CodecOutputList.newInstance() 這里就當(dāng)成一個(gè)ArrayList就好, 用于盛放解碼完成的數(shù)據(jù)
ByteBuf data = (ByteBuf) msg 這步將數(shù)據(jù)轉(zhuǎn)化成ByteBuf
first = cumulation == null 這里表示如果cumulation == null, 說(shuō)明沒(méi)有存儲(chǔ)板半包數(shù)據(jù), 則將當(dāng)前的數(shù)據(jù)保存在屬性cumulation中
如果 cumulation != null , 說(shuō)明存儲(chǔ)了半包數(shù)據(jù), 則通過(guò)cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的數(shù)據(jù)和原來(lái)的數(shù)據(jù)進(jìn)行累加, 保存在屬性cumulation中
我們看cumulator屬性
private Cumulator cumulator = MERGE_CUMULATOR;
這里調(diào)用了其靜態(tài)屬性MERGE_CUMULATOR, 我們跟過(guò)去:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
//不能到過(guò)最大內(nèi)存
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
//將當(dāng)前數(shù)據(jù)buffer
buffer.writeBytes(in);
in.release();
return buffer;
}
};這里創(chuàng)建了Cumulator類型的靜態(tài)對(duì)象, 并重寫了cumulate方法, 這里cumulate方法, 就是用于將ByteBuf進(jìn)行拼接的方法:
方法中, 首先判斷cumulation的寫指針+in的可讀字節(jié)數(shù)是否超過(guò)了cumulation的最大長(zhǎng)度, 如果超過(guò)了, 將對(duì)cumulation進(jìn)行擴(kuò)容, 如果沒(méi)超過(guò), 則將其賦值到局部變量buffer中
然后將in的數(shù)據(jù)寫到buffer中, 將in進(jìn)行釋放, 返回寫入數(shù)據(jù)后的ByteBuf
回到channelRead方法中:
最后通過(guò)callDecode(ctx, cumulation, out)方法進(jìn)行解碼, 這里傳入了Context對(duì)象, 緩沖區(qū)cumulation和集合out:
我們跟到callDecode(ctx, cumulation, out)方法中:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//只要累加器里面有數(shù)據(jù)
while (in.isReadable()) {
int outSize = out.size();
//判斷當(dāng)前List是否有對(duì)象
if (outSize > 0) {
//如果有對(duì)象, 則向下傳播事件
fireChannelRead(ctx, out, outSize);
//清空當(dāng)前l(fā)ist
out.clear();
//解碼過(guò)程中如ctx被removed掉就break
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//當(dāng)前可讀數(shù)據(jù)長(zhǎng)度
int oldInputLength = in.readableBytes();
//子類實(shí)現(xiàn)
//子類解析, 解析玩對(duì)象放到out里面
decode(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//List解析前大小 和解析后長(zhǎng)度一樣(什么沒(méi)有解析出來(lái))
if (outSize == out.size()) {
//原來(lái)可讀的長(zhǎng)度==解析后可讀長(zhǎng)度
//說(shuō)明沒(méi)有讀取數(shù)據(jù)(當(dāng)前累加的數(shù)據(jù)并沒(méi)有拼成一個(gè)完整的數(shù)據(jù)包)
if (oldInputLength == in.readableBytes()) {
//跳出循環(huán)(下次在讀取數(shù)據(jù)才能進(jìn)行后續(xù)的解析)
break;
} else {
//沒(méi)有解析到數(shù)據(jù), 但是進(jìn)行讀取了
continue;
}
}
//out里面有數(shù)據(jù), 但是沒(méi)有從累加器讀取數(shù)據(jù)
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}這里首先循環(huán)判斷傳入的ByteBuf是否有可讀字節(jié), 如果還有可讀字節(jié)說(shuō)明沒(méi)有解碼完成, 則循環(huán)繼續(xù)解碼
然后判斷集合out的大小, 如果大小大于1, 說(shuō)明out中盛放了解碼完成之后的數(shù)據(jù), 然后將事件向下傳播, 并清空out
因?yàn)槲覀兊谝淮谓獯aout是空的, 所以這里不會(huì)進(jìn)入if塊, 這部分我們稍后分析, 這里繼續(xù)往下看
通過(guò) int oldInputLength = in.readableBytes() 獲取當(dāng)前ByteBuf, 其實(shí)也就是屬性cumulation的可讀字節(jié)數(shù), 這里就是一個(gè)備份用于比較, 我們繼續(xù)往下看:
decode(ctx, in, out)方法是最終的解碼操作, 這部會(huì)讀取cumulation并且將解碼后的數(shù)據(jù)放入到集合out中, 在ByteToMessageDecoder中的該方法是一個(gè)抽象方法, 讓子類進(jìn)行實(shí)現(xiàn), 我們使用的netty很多的解碼都是繼承了ByteToMessageDecoder并實(shí)現(xiàn)了decode方法從而完成了解碼操作, 同樣我們也可以遵循相應(yīng)的規(guī)則進(jìn)行自定義解碼器, 在之后的小節(jié)中會(huì)講解netty定義的解碼器, 并剖析相關(guān)的實(shí)現(xiàn)細(xì)節(jié), 這里我們繼續(xù)往下看:
if (outSize == out.size()) 這個(gè)判斷表示解析之前的out大小和解析之后out大小進(jìn)行比較, 如果相同, 說(shuō)明并沒(méi)有解析出數(shù)據(jù), 我們進(jìn)入到if塊中:
if (oldInputLength == in.readableBytes()) 表示cumulation的可讀字節(jié)數(shù)在解析之前和解析之后是相同的, 說(shuō)明解碼方法中并沒(méi)有解析數(shù)據(jù), 也就是當(dāng)前的數(shù)據(jù)并不是一個(gè)完整的數(shù)據(jù)包, 則跳出循環(huán), 留給下次解析, 否則, 說(shuō)明沒(méi)有解析到數(shù)據(jù), 但是讀取了, 所以跳過(guò)該次循環(huán)進(jìn)入下次循環(huán)
最后判斷 if (oldInputLength == in.readableBytes()) , 這里代表out中有數(shù)據(jù), 但是并沒(méi)有從cumulation讀數(shù)據(jù), 說(shuō)明這個(gè)out的內(nèi)容是非法的, 直接拋出異常
我們回到channRead方法中
我們關(guān)注finally中的內(nèi)容:
finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長(zhǎng)度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}首先判斷cumulation不為null, 并且沒(méi)有可讀字節(jié), 則將累加器進(jìn)行釋放, 并設(shè)置為null
之后記錄out的長(zhǎng)度, 通過(guò)fireChannelRead(ctx, out, size)將channelRead事件進(jìn)行向下傳播, 并回收out對(duì)象
我們跟到fireChannelRead(ctx, out, size)方法中:
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
//遍歷List
for (int i = 0; i < numElements; i ++) {
//逐個(gè)向下傳遞
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}這里遍歷out集合, 并將里面的元素逐個(gè)向下傳遞
以上就是有關(guān)解碼的骨架邏輯
更多關(guān)于Netty分布式解碼器讀取數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot aop切到service層,不生效問(wèn)題
這篇文章主要介紹了springboot aop切到service層,不生效問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
SpringBoot可視化接口開(kāi)發(fā)工具magic-api的簡(jiǎn)單使用教程
作為Java后端開(kāi)發(fā),平時(shí)開(kāi)發(fā)API接口的時(shí)候經(jīng)常需要定義Controller、Service、Dao、Mapper、XML、VO等Java對(duì)象。有沒(méi)有什么辦法可以讓我們不寫這些代碼,直接操作數(shù)據(jù)庫(kù)生成API接口呢?今天給大家推薦一款工具magic-api,來(lái)幫我們實(shí)現(xiàn)這個(gè)小目標(biāo)!2021-06-06
Java實(shí)例化一個(gè)抽象類對(duì)象的方法教程
大家都知道抽象類無(wú)法實(shí)例化,就無(wú)法創(chuàng)建對(duì)象。所以下面這篇文章主要給大家介紹了關(guān)于Java實(shí)例化一個(gè)抽象類對(duì)象的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-12-12
為zookeeper配置相應(yīng)的acl權(quán)限
這篇文章主要介紹了為zookeeper配置相應(yīng)的acl權(quán)限的相關(guān)實(shí)例,具有一定參考價(jià)值,需要的朋友可以了解下。2017-09-09
淺談Java中SimpleDateFormat 多線程不安全原因
SimpleDateFormat是Java中用于日期時(shí)間格式化的一個(gè)類,本文主要介紹了淺談Java中SimpleDateFormat 多線程不安全原因,感興趣的可以了解一下2024-01-01
Java讀取項(xiàng)目json文件并轉(zhuǎn)為JSON對(duì)象的操作
這篇文章主要介紹了Java讀取項(xiàng)目json文件并轉(zhuǎn)為JSON對(duì)象的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08

