admin 管理员组文章数量: 887021
2023年12月23日发(作者:curle)
@Override protected void doOpen() throws Throwable{ bootstrap = new ServerBootstrap(); bossGroup = oopGroup(1, "NettyServerBoss"); workerGroup = oopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, T_IO_THREADS), "NettyServerWorker"); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nnels(); (bossGroup, workerGroup) .channel(SocketChannelClass()) .option(_REUSEADDR, ) .childOption(_NODELAY, ) .childOption(TOR, T) .childHandler(new ChannelInitializer
final public class NettyCodecAdapter { private final ChannelHandler encoder = new InternalEncoder(); private final ChannelHandler decoder = new InternalDecoder();
public ChannelHandler getEncoder() { return encoder; } public ChannelHandler getDecoder() { return decoder; }
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List
TF(orMessage()); } uffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } (); (); //响应的数据包长度 int len = nBytes(); checkPayload(channel, len); 2bytes(len, header, 12); // write Index(savedWriteIndex); ytes(header); // write header. Index(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // clear buffer Index(savedWriteIndex); // send error message to Consumer, otherwise, Consumer will wait till timeout. if (!t() && tus() != _RESPONSE) { Response r = new Response((), sion()); tus(_RESPONSE); if (t instanceof ExceedPayloadLimitException) { (sage(), t); try { orMessage(sage()); (r); return; } catch (RemotingException e) { ("Failed to send bad_response info back: " + sage() + ", cause: " + sage(), e); } } else { // FIXME log error message in Codec and handle in caught() of IoHanndler? ("Fail to encode response: " + res + ", send bad_response info instead, cause: " + sage(), t); try { orMessage("Failed to send response: " + res + ", cause: " + ng(t)); (r); return; } catch (RemotingException e) { ("Failed to send bad_response info back: " + res + ", cause: " + sage(), e); } } } // Rethrow exception if (t instanceof IOException) { throw (IOException) t; } else if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new RuntimeException(sage(), t); } } }
// decode response.
如果是响应信息 Response res = new Response(id); if ((flag & FLAG_EVENT) != 0) { //是否是event事件 nt(true); } // get status.
获取响应的状态 byte status = header[3]; tus(status); try { if (status == ) {//如果是 OK状态 Object data; if (t()) { //如果是事件响应 ObjectInput in = alize((), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcResult result; if (().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); (); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } ult(data); } else { ObjectInput in = alize((), is, proto); orMessage(F()); } } catch (Throwable t) { //如果不成功,直接设置错误状态,并将error信息返回 if (Enabled()) { ("Decode response failed: " + sage(), t); } tus(_ERROR); orMessage(ng(t)); } return res; } else { // decode request.
如果是请求信息 Request req = new Request(id); sion(tocolVersion()); //tocolVersion() == Dubbo RPC protocol version == 2.0.2 Way((flag & FLAG_TWOWAY) != 0); if ((flag & FLAG_EVENT) != 0) { nt(true); } try { Object data; if (t()) { ObjectInput in = alize((), is, proto); data = decodeEventData(channel, in); } else { DecodeableRpcInvocation inv; if (().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); (); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); }
data = inv; } a(data); } catch (Throwable t) { if (Enabled()) { ("Decode request failed: " + sage(), t); } // bad request ken(true); a(t); } return req; } }
版权声明:本文标题:九、ApacheDubbo3协议解析(三)Dubbo协议解析入口讲解 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1703320904h446810.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论