Java教程

Java NIO 入门教程 二 ServerSocketChannel 异步通信

IO文件操作 从心出发 2020-06-10 13:11:28.0 84 0条

1.Java (Socket,ServerSocket)与(SocketChannel,ServerSocketChannel)区别和联系

Socket 和ServerSocke 是一对 他们是java.net下面实现socket通信的类
SocketChannel 和ServerSocketChannel是一对 他们是java.nio下面实现通信的类 支持异步通信

服务器必须先建立ServerSocket或者ServerSocketChannel 来等待客户端的连接
客户端必须建立相对应的Socket或者SocketChannel来与服务器建立连接
服务器接受到客户端的连接受,再生成一个Socket或者SocketChannel与此客户端通信

不过Socket和SocketChannel可以通过 socket.channel() SocketChannel.socket() 方法相互转换
同理ServerSocket 和ServerSocketChannel 也可以相互转换

基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:

toc2Os.png

代码区分如下:

阻塞I/O Socket 和ServerSocke

  1. public class PlainOioServer {
  2. public void serve(int port) throws IOException {
  3. final ServerSocket socket = new ServerSocket(port); //1
  4. try {
  5. for (;;) {
  6. final Socket clientSocket = socket.accept(); //2
  7. System.out.println("Accepted connection from " + clientSocket);
  8. new Thread(new Runnable() { //3
  9. @Override
  10. public void run() {
  11. OutputStream out;
  12. try {
  13. out = clientSocket.getOutputStream();
  14. out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4
  15. out.flush();
  16. clientSocket.close(); //5
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. try {
  20. clientSocket.close();
  21. } catch (IOException ex) {
  22. // ignore on close
  23. }
  24. }
  25. }
  26. }).start(); //6
  27. }
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }

非阻塞IO SocketChannel 和ServerSocketChannel

  1. public class PlainNioServer {
  2. public void serve(int port) throws IOException {
  3. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  4. serverChannel.configureBlocking(false);
  5. ServerSocket ss = serverChannel.socket();
  6. InetSocketAddress address = new InetSocketAddress(port);
  7. ss.bind(address); //1
  8. Selector selector = Selector.open(); //2
  9. serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3
  10. final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
  11. for (;;) {
  12. try {
  13. selector.select(); //4
  14. } catch (IOException ex) {
  15. ex.printStackTrace();
  16. // handle exception
  17. break;
  18. }
  19. Set<SelectionKey> readyKeys = selector.selectedKeys(); //5
  20. Iterator<SelectionKey> iterator = readyKeys.iterator();
  21. while (iterator.hasNext()) {
  22. SelectionKey key = iterator.next();
  23. iterator.remove();
  24. try {
  25. if (key.isAcceptable()) { //6
  26. ServerSocketChannel server =
  27. (ServerSocketChannel)key.channel();
  28. SocketChannel client = server.accept();
  29. client.configureBlocking(false);
  30. client.register(selector, SelectionKey.OP_WRITE |
  31. SelectionKey.OP_READ, msg.duplicate()); //7
  32. System.out.println(
  33. "Accepted connection from " + client);
  34. }
  35. if (key.isWritable()) { //8
  36. SocketChannel client =
  37. (SocketChannel)key.channel();
  38. ByteBuffer buffer =
  39. (ByteBuffer)key.attachment();
  40. while (buffer.hasRemaining()) {
  41. if (client.write(buffer) == 0) { //9
  42. break;
  43. }
  44. }
  45. client.close(); //10
  46. }
  47. } catch (IOException ex) {
  48. key.cancel();
  49. try {
  50. key.channel().close();
  51. } catch (IOException cex) {
  52. // 在关闭时忽略
  53. }
  54. }
  55. }
  56. }
  57. }
  58. }

了解Java的NIO为非阻塞式Socket

Java的NIO为非阻塞式Socket通信提供了如下几个特殊类。

➢Selector:它是SelectableChannel对象的多路复用器,所有希望采用非阻塞方式进行通信的Channel都应该注册到Selector对象。可以通过调用此类的open()静态方法来创建Selector实例,该方法将使用系统默认的Selector来返回新的Selector。

Selector可以同时监控多个SelectableChannel 的I0状况,是非阻塞I0的核心。一个Selector实例有三个SelectionKey集合。

➢所有的SelectionKey集合:代表了注册在该Selector 上的Channel, 这个集合可以通过keys(方法返回。

➢被选择的SelectionKey集合:代表了所有可通过select(方法获取的、需要进行I0处理的Channel,这个集合可以通过selectedKeys(返回。

➢被取消的SelectionKey集合:代表了所有被取消注册关系的Channel,在下一次执行select(方法时,这些Channel对应的SelectionKey会被彻底删除,程序通常无须直接访问该集合。

除此之外,Selector 还提供了一系列和select()相关的方法,如下所示。

➢int select():监控所有注册的Channel,当它们中间有需要处理的I0操作时,该方法返回,并将对应的SelectionKey加入被选择的SelectionKey集合中,该方法返回这些Channel的数量。

➢int select(long timeout):可 以设置超时时长的select()操作。

➢int selectNow():执行一个立即返回的select()操作,相对于无参数的select(方法而言,该方法不会阻塞线程。

➢Selector wakeup():使一一个还 未返回的select(方法立刻返回。

➢SelectableChannel: 它代表可以支持非阻塞IO操作的Channel对象,它可被注册到Selector上,这种注册关系由SelectionKey实例表示。Selector 对象提供了-一个select(方法,该方法允许应用程序同时监控多个I0 Channel。

应用程序可调用SelectableChannel的register()方 法将其注册到指定Selector. 上,当该Selector. 上的某些SelectableChannel上有需要处理的I0操作时,程序可以调用Selector 实例的select()方法获取它们的数量,并可以通过selectedKeys(方法返回它们对应的SelectionKey集合一-通 过该集合就可以获取所有需要进行I0处理的SelectableChannel集。

SelectableChannel对象支持阻塞和非阻塞两种模式(所有的Channel默认都是阻塞模式),必须使用非阻塞模式才可以利用非阻塞I0操作。SelectableChannel提供了如下两个方法来设置和返回该Channel的模式状态。

➢SelectableChannel configureBlocking(boolean block):设置是否采用阻塞模式。

➢boolean isBlocking():返回该Channel是否是阻塞模式。

开发步骤

异步 I/O 中的核心对象名为 Selector。Selector 就是您注册对各种 I/O 事件的兴趣的地方,而且当那些事件发生时,就是这个对象告诉您所发生的事件。

所以,我们需要做的第一件事就是创建一个 Selector:

  1. Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件的兴趣。register() 的第一个参数总是这个 Selector。

1 打开一个 ServerSocketChannel

为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。

对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

  1. ServerSocketChannel ssc = ServerSocketChannel.open();
  2. ssc.configureBlocking( false);
  3. ServerSocket ss = ssc.socket();
  4. InetSocketAddress address = newInetSocketAddress( ports[ii] );
  5. ss.bind( address );

第一行创建一个新的 ServerSocketChannel ,最后三行将它绑定到给定的端口。第二行将 ServerSocketChannel 设置为 非阻塞的 。我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。

2. 选择键

下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

  1. SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

register() 的第一个参数总是这个 Selector。第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。

请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

3. 内部循环

现在已经注册了我们对一些 I/O 事件的兴趣,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

  1. intnum = selector.select();
  2. Set selectedKeys = selector.selectedKeys();
  3. Iterator it = selectedKeys.iterator();
  4. while(it.hasNext()) {
  5. SelectionKey key = (SelectionKey)it.next();
  6. // ... deal with I/O event ...
  7. }

首先,我们调用 Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。

接下来,我们调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的一个 集合 。

我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。

4. 监听新连接

程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

  1. if((key.readyOps() & SelectionKey.OP_ACCEPT)
  2. == SelectionKey.OP_ACCEPT) {
  3. // Accept the new connection
  4. // ...
  5. }

可以肯定地说, readOps() 方法告诉我们该事件是新的连接。

5. 接受新的连接

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

  1. ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
  2. SocketChannel sc = ssc.accept();

下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

  1. sc.configureBlocking( false);
  2. SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

注意我们使用 register() 的 OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。

6. 删除处理过的 SelectionKey

在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:

  1. it.remove();

现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。

7 传入的 I/O

当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

  1. } elseif((key.readyOps() & SelectionKey.OP_READ)
  2. == SelectionKey.OP_READ) {
  3. // Read the data
  4. SocketChannel sc = (SocketChannel)key.channel();
  5. // ...
  6. }

与以前一样,我们取得发生 I/O 事件的通道并处理它。在本例中,由于这是一个 echo server,我们只希望从套接字中读取数据并马上将它发送回去。关于这个过程的细节,请参见 参考资料 中的源代码 (MultiPortEcho.java)。

8. 回到主循环

每次返回主循环,我们都要调用 select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从 Selector 中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。

最后:

在Java 7以前,ServerSocketChannel的设计更糟糕一要 让ServerSocketChannel监听指定端口,必须先调用它的socket()方 法获取它关联的ServerSocket 对象,再调用ServerSocket的bind()方法去监听指定端口。Java 7为ServerSocketChannel 新增了bind()方法,因此稍微简单了一些。

所以上面得程序可以改成

  1. ServerSocketChannel serverChannel = ServerSocketChannel.open();
  2. serverChannel.configureBlocking(false);
  3. //ServerSocket ss = serverChannel.socket();
  4. InetSocketAddress address = new InetSocketAddress(port);
  5. serverChannel.bind(address);
暗锚,解决锚点偏移

文章评论

嘿,来试试登录吧!