admin 管理员组文章数量: 887021
Java NIO Selector , SelectionKey , SocketChannel , ServerSocketChannel
一 NIO介绍
1. NIO是非阻塞的
NIO非堵塞应用通常适用用在I/O读写等方面,我们知道,系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打开一个I/O通道后,read()将一直等待在端口一边读取字节内容,假如没有内容进来,read()也是傻傻的等,这会影响我们程序继续做其他事情,那么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源的。
2. 实现原理
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,假如有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等。
Selector就是观察者,观察 Server 端的ServerSocketChannel 和 Client 端的 SocketChannel ;前提是它们需要先注册到 同一个Selector,即观察者中;
----详细说明
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。Selector内部原理实际是在做一个对所注册的Channel(SocketChannel)的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,它就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。
while(!stop) {try {selector.select(1000); // 等待客户端发请求 1000msselector.selectedKeys().stream().forEach( key -> handleSelectionKey(key) );}catch(Exception e) {e.printStackTrace();}}
3. 小结
从外界看,实现了流畅的I/O读写,不堵塞了。Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
下文部分转载自
二 Selector , SelectionKey , SocketChannel , ServerSocketChannel 具体应用和功能
NIO的通讯过程
1. Selector (选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道。
-
java.nio.channels
-
public abstract class Selector extends Object implements Closeable
1.1 Selector 的创建
通过调用Selector.open()方法创建一个Selector;Selector selector = Selector.open();
isOpen() —— 判断Selector是否处于打开状态。Selector对象创建后就处于打开状态了。
close() —— 当调用了Selector对象的close()方法,就进入关闭状态.。用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。通道本身并不会关闭
1.2 ServerChanel 向 Selector 中注册
为了将Channel和Selector配合使用,必须将channel注册到selector上。
通过SelectableChannel。register()方法来实现。
-
channel.configureBlocking(false);
-
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着FIleChannel与Selector不能一起使用。
注意register()方法的第二个参数,这是一个”interest集合“,意思是在通过Selector监听Channel时对什么事件感兴趣。
可以监听四种不同类型的事件:
- Connect
- Accept
- Read
- Write
通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为”连接就绪“。一个server socket channel准备号接收新进入的连接称为”接收就绪“。一个有数据可读的通道可以说是”读就绪“。等代写数据的通道可以说是”写就绪“。
这四种事件用SelectionKey的四个常量来表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
2. register()返回值 —— SelectionKey, Selector中的SelectionKey集合
只要ServerSocketChannel及SocketChannel向Selector注册了特定的事件,Selector就会监控这些事件是否发生。
SelectableChannel的register()方法返回一个SelectionKey对象,该对象是用于跟踪这些被注册事件的句柄。
一个Selector对象会包含3种类型的SelectionKey集合:
- all-keys集合 —— 当前所有向Selector注册的SelectionKey的集合,Selector的keys()方法返回该集合
- selected-keys集合 —— 相关事件已经被Selector捕获的SelectionKey的集合,Selector的selectedKeys()方法返回该集合
- cancelled-keys集合 —— 已经被取消的SelectionKey的集合,Selector没有提供访问这种集合的方法
当register()方法执行时,新建一个SelectioKey,并把它加入Selector的all-keys集合中。
----selectionKey手动关闭 remove() 或cancel()
如果关闭了与SelectionKey对象关联的Channel对象,或者调用了SelectionKey对象的cancel方法,这个SelectionKey对象就会被加入到cancelled-keys集合中,表示这个SelectionKey对象已经被取消。
在执行Selector的select()方法时,如果与SelectionKey相关的事件发生了,这个SelectionKey就被加入到selected-keys集合中,程序直接调用selected-keys集合的remove()方法,或者调用它的iterator的remove()方法,都可以从selected-keys集合中删除一个SelectionKey对象。
3. SelectionKey——SelectableChannel 在 Selector 中的注册的标记/句柄。
register()方法返回一个SelectinKey对象,这个对象包含一些你感兴趣的属性:
- interest集合
- ready集合
- Channel
- Selector
- 附加的对象
通过调用某个SelectionKey的cancel()方法,关闭其通道,或者通过关闭其选择器来取消该Key之前,它一直保持有效。
取消某个Key之后不会立即从Selector中移除它,相反,会将该Key添加到Selector的已取消key set,以便在下一次进行选择操作的时候移除它。
- interest集合 —— 感兴趣的事件集合,可以通过SelectionKey读写interest集合,
-
int interestSet = selectionKey.interestOps();
-
boolean isInterestedInAccept = (interestSet & Selection.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
-
boolean isInterestedInConnect = interestSet & SelectioKey.OP_CONNECT;
-
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
-
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
- ready集合 —— 是通道已经准备就绪的操作的集合,在一个选择后,你会是首先访问这个ready set,
int readySet = selectionKey.readyOps();
可以向检测interet集合那样的方法,来检测channel中什么事件或操作已经就绪,也可以使用一下四个方法,
selectionKey.isAcceptable();selectionKey.isConnectable();selectionKey.isReadable();selectionKey.isWritable();
----Selector 内容
- 从SelectionKey中获取Channel和Selector:
-
Channel channel = selectionKey.channel();
-
Selector selector = selectionKey.selector();
- 附加的对象 —— 可以将一个对象或者更多的信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加与通道一起使用的Buffer,或是包含聚集数据的某个对象,
-
selectionKey.attach(theObject);
-
Object attachedObj = selectionKey.attachment();
4. 通过Selector选择就绪的通道
一旦向Selector注册了一个或多个通道,就可以调用几个重载的select()方法。
这些方法返回你所感兴趣的事件(连接,接受,读或写)已经准备就绪的那些通道。换句话说,如果你对”读就绪“的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。
- select() —— 阻塞到至少有一个通道在你注册的事件上就绪了
- select(long timeout) —— 和select()一样,除了最长会阻塞timeout毫秒
- selectNow() —— 不会阻塞,不管什么通道就绪都立刻返回;此方法执行非阻塞的选择操作,如果自从上一次选择操作后,没有通道变成可选择的,则此方法直接返回0
- select()方法返回的Int值表示多少通道就绪。
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectorKeys()方法,访问”已选择键集“中的就绪通道,
Set selectedKeys = selector.selectedKeys();
可以遍历这个已选择的集合来访问就绪的通道:
-
Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); if (key.isAcceptable()){ // a connection was accepted by a ServerSocketChannel }else if (key.isConnectable()){ // a connection was eatablished with a remote server }else if (key.isReadable()){ // a channel is ready for reading }else if (key.isWritable()){ // a channel is ready for writing } keyIterator.remove(); }
这个循环遍历已选择集中的每个键,并检测各个键所对象的通道的就绪事件。
注意每次迭代末尾的remove()调用,Selector不会自己从已选择集中移除SelectioKey实例,必须在处理完通道时自己移除。
5. Selector的wakeUp()方法
某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其他线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。
-------个人实现的一个 案例代码
import org.apache.commons.lang.StringUtils;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.logging.Logger;class MyTimeServer implements Runnable {private static Logger logger = Logger.getLogger("MyTimeServer");private int port;private volatile boolean stop;private Selector selector;private ServerSocketChannel servChannel;public MyTimeServer(int port) {this.port = port;}private void startTimeServer(){try{selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);servChannel.socket().bind(new InetSocketAddress(this.port),1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);this.stop = false;logger.info("TimeServer starts in port : " + port);}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stopTimeServer() {this.stop = true;try{selector.close();}catch (IOException e){e.printStackTrace();}}public void run() {startTimeServer();while(!stop) {try {selector.selectedKeys().stream().forEach( selectKey -> handleSelectionKey(selectKey) );}catch(Exception e) {e.printStackTrace();}}//多路复用器关闭以后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源if(selector != null) {stopTimeServer();}}private void handleSelectionKey(SelectionKey selectKey){try{handleRequest(selectKey);if(selectKey != null) {selectKey.cancel();if(selectKey.channel() != null) {selectKey.channel().close(); //处理client的key后 关闭该连接}}}catch(Exception e) {e.printStackTrace();}}private void handleRequest(SelectionKey selectKey) throws Exception {if(selectKey.isValid()) {if(selectKey.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectKey.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector , SelectionKey.OP_READ);}if(selectKey.isReadable()) {SocketChannel socketChannel = (SocketChannel)selectKey.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024); //预分配1024int readBytes = socketChannel.read(readBuffer);if(readBytes > 0) {byte[] readbytes = new byte[readBytes];readBuffer.position(0); // 重置 byteBuffer 的位置 position 变量readBuffer.get(readbytes , 0 , readBytes);handleReadContext(socketChannel,new String(readbytes , "UTF-8"));} else {selectKey.cancel();socketChannel.close();}}}}private void handleReadContext(SocketChannel socketChannel , String requestBody) {if(StringUtils.isNotBlank(requestBody) && requestBody.equals("time")) {logger.info("The timeServer receive request : " + requestBody);String response = "[success] msg : " + new Date(System.currentTimeMillis()).toString();try {reponse(socketChannel, response);} catch (Exception e) {e.printStackTrace();}}else {try {reponse(socketChannel, "[success] msg : nothing to say");} catch (Exception e) {e.printStackTrace();}}}private void reponse(SocketChannel socketChannel, String response) throws Exception {if(response != null && response.length() > 0) {byte[] reponesBytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(reponesBytes.length);writeBuffer.put(reponesBytes);writeBuffer.flip();socketChannel.write(writeBuffer);writeBuffer.clear();}}
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.logging.Logger;class MyTimeServer implements Runnable {private static Logger logger = Logger.getLogger("MyTimeServer");private int port;private volatile boolean stop;private Selector selector;private ServerSocketChannel servChannel;public MyTimeServer(int port) {this.port = port;}private void startTimeServer(){try{selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);servChannel.socket().bind(new InetSocketAddress(this.port),1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);this.stop = false;logger.info("TimeServer starts in port : " + port);}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stopTimeServer() {this.stop = true;try{selector.close();}catch (IOException e){e.printStackTrace();}}public void run() {startTimeServer();while(!stop) {try {selector.select(1000); // 等待客户端发请求 1000msselector.selectedKeys().stream().forEach( key -> handleSelectionKey(key) );}catch(Exception e) {e.printStackTrace();}}//多路复用器关闭以后,所有注册在上面的Channel和Pipe等资源都会自动去注册并关闭,所以不需要重复释放资源if(selector != null) {stopTimeServer();}}private void handleSelectionKey(SelectionKey key){try{handleRequest(key);if(key != null) {key.cancel();if(key.channel() != null) {key.channel().close(); //处理client的key后 关闭该连接}}}catch(Exception e) {e.printStackTrace();}}private void handleRequest(SelectionKey key) throws Exception {if(key.isValid()) {if(key.isAcceptable()) {ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector , SelectionKey.OP_READ);}if(key.isReadable()) {SocketChannel socketChannel = (SocketChannel)key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(readBuffer);if(readBytes > 0) {byte[] readbytes = new byte[readBytes];readBuffer.position(0); // 重置 byteBuffer 的位置 position 变量readBuffer.get(readbytes , 0 , readBytes);String body = new String(readbytes, "UTF-8");logger.info("The timeServer receive request : " + body);String response = "[success] msg : " + new Date(System.currentTimeMillis()).toString();reponse(socketChannel, response);} else {key.cancel();socketChannel.close();}}}}private void reponse(SocketChannel socketChannel, String response) throws Exception {if(response != null && response.length() > 0) {byte[] reponesBytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(reponesBytes.length);writeBuffer.put(reponesBytes);writeBuffer.flip();socketChannel.write(writeBuffer);writeBuffer.clear();}}
}
本文标签: Java NIOSelector SelectionKey SocketChannel ServerSocketChannel
版权声明:本文标题:Java NIOSelector , SelectionKey, SocketChannel , ServerSocketChannel 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1688269442h199600.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论