一、NIO類庫簡介
1、緩衝區Buffer
Buffer是一個對象,包含一些要寫入和讀出的數據。
在NIO中,所有的數據都是用緩衝區處理的,讀取數據時,它是從通道(Channel)直接讀到緩衝區中,在寫入數據時,也是從緩衝區寫入到通道。
緩衝區實質上是一個數組,通常是一個字節數組(ByteBuffer),也可以是其它類型的數組,此外緩衝區還提供了對數據的結構化訪問以及維護讀寫位置等信息。
Buffer類的繼承關係如下圖所示:
2、通道Channel
Channel是一個通道,網絡數據通過Channel讀取和寫入。通道和流的不同之處在於通道是雙向的(通道可以用於讀、寫後者二者同時進行),流只是在一個方向上移動。
Channel大體上可以分為兩類:用於網絡讀寫的SelectableChannel(ServerSocketChannel和SocketChannel就是其子類)、用於文件操作的FileChannel。
下面的例子給出通過FileChannel來向文件中寫入數據、從文件中讀取數據,將文件數據拷貝到另一個文件中:
public class NioTest{ public static void main(String[] args) throws IOException { copyFile(); } //拷貝文件private static void copyFile() { FileInputStream in=null; FileOutputStream out=null; try { in=new FileInputStream("src/main/java/data/in-data.txt"); out=new FileOutputStream("src/main/java/data/out-data.txt"); FileChannel inChannel=in.getChannel(); FileChannel outChannel=out.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); int bytesRead = inChannel.read(buffer); while (bytesRead!=-1) { buffer.flip(); outChannel.write(buffer); buffer.clear(); bytesRead = inChannel.read(buffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //寫文件private static void writeFileNio() { try { RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw"); FileChannel fc=fout.getChannel(); ByteBuffer buffer=ByteBuffer.allocate(1024); buffer.put("hi123".getBytes()); buffer.flip(); try { fc.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //讀文件private static void readFileNio() { FileInputStream fileInputStream; try { fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt"); FileChannel fileChannel=fileInputStream.getChannel();//從FileInputStream 獲取通道ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//創建緩衝區int bytesRead=fileChannel.read(byteBuffer);//將數據讀到緩衝區while(bytesRead!=-1) { /*limit=position * position=0; */ byteBuffer.flip(); //hasRemaining():告知在當前位置和限制之間是否有元素while (byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } /* * 清空緩衝區* position=0; * limit=capacity; */ byteBuffer.clear(); bytesRead = fileChannel.read(byteBuffer); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}3、多路復用器Selector
多路復用器提供選擇已經就緒的任務的能力。 Selector會不斷的輪詢註冊在其上的Channel,如果某個Channel上面發送讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll代替了傳統的select實現,所以它沒有最大連接句柄1024/2048的限制,意味著只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。其模型如下圖所示:
用單線程處理一個Selector。要使用Selector,得向Selector註冊Channel,然後調用它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子有如新連接進來,數據接收等。
注:
1、什麼select模型?
select是事件觸發機制,當等待的事件發生就觸發進行處理,多用於Linux實現的服務器對客戶端的處理。
可以阻塞地同時探測一組支持非阻塞的IO設備,是否有事件發生(如可讀、可寫,有高優先級錯誤輸出等),直至某一個設備觸發了事件或者超過了指定的等待時間。也就是它們的職責不是做IO,而是幫助調用者尋找當前就緒的設備。
2、什麼是epoll模型?
epoll的設計思路,是把select/poll單個的操作拆分為1個epoll_create+多個epoll_ctrl+一個wait。此外,內核針對epoll操作添加了一個文件系統”eventpollfs”,每一個或者多個要監視的文件描述符都有一個對應的eventpollfs文件系統的inode節點,主要信息保存在eventpoll結構體中。而被監視的文件的重要信息則保存在epitem結構體中。所以他們是一對多的關係。
二、NIO服務器端開發
功能說明:開啟服務器端,對每一個接入的客戶端都向其發送hello字符串。
使用NIO進行服務器端開發主要有以下幾個步驟:
1、創建ServerSocketChannel,配置它為非阻塞模式
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);
2、綁定監聽,配置TCP參數,如backlog大小
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
3、創建一個獨立的I/O線程,用於輪詢多路復用器Selector
4、創建Selector,將之前創建的ServerSocketChannel註冊到Selector上,監聽SelectionKey.ACCEPT
selector=Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
5、啟動I/O線程,在循環體內執行Selector.select()方法,輪詢就緒的Channel
while(true) { try { //select()阻塞到至少有一個通道在你註冊的事件上就緒了//如果沒有準備好的channel,就在這一直阻塞//select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。 selector.select(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); break; } }6、當輪詢到了處於就緒狀態的Channel時,需對其進行判斷,如果是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端
//返回已經就緒的SelectionKey,然後迭代執行Set<SelectionKey> readKeys=selector.selectedKeys(); for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) { SelectionKey key=it.next(); it.remove(); try { if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); client.register(selector,SelectionKey.OP_WRITE); } else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); } }catch(IOException e) { e.printStackTrace(); key.cancel(); try { key.channel().close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } }7、設置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置其他的一些TCP參數
if(key.isAcceptable()) { ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel client=server.accept(); client.configureBlocking(false); ... }8、將SocketChannel註冊到Selector,監聽OP_WRITE
client.register(selector,SelectionKey.OP_WRITE);
9、如果輪詢的Channel為OP_WRITE,則說明要向SockChannel中寫入數據,則構造ByteBuffer對象,寫入數據包
else if(key.isWritable()) { SocketChannel client=(SocketChannel) key.channel(); ByteBuffer buffer=ByteBuffer.allocate(20); String str="hello"; buffer=ByteBuffer.wrap(str.getBytes()); client.write(buffer); key.cancel(); }完整代碼如下:
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class ServerSocketChannelDemo{public static void main(String[] args) {ServerSocketChannel serverSocketChannel;Selector selector=null;try {serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.socket().bind(new InetSocketAddress(8080));selector=Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);}catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}while(true) {try {//select()阻塞到至少有一個通道在你註冊的事件上就緒了//如果沒有準備好的channel,就在這一直阻塞//select(long timeout)和select()一樣,除了最長會阻塞timeout毫秒(參數)。 selector.select();}catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();break;}//返回已經就緒的SelectionKey,然後迭代執行Set<SelectionKey> readKeys=selector.selectedKeys();for (Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();) {SelectionKey key=it.next();it.remove();try {if(key.isAcceptable()) {ServerSocketChannel server=(ServerSocketChannel) key.channel();SocketChannel client=server.accept();client.configureBlocking(false);client.register(selector,SelectionKey.OP_WRITE);} else if(key.isWritable()) {SocketChannel client=(SocketChannel) key.channel();ByteBuffer buffer=ByteBuffer.allocate(20);String str="hello";buffer=ByteBuffer.wrap(str.getBytes());client.write(buffer);key.cancel();}}catch(IOException e) {e.printStackTrace();key.cancel();try {key.channel().close();}catch (IOException e1) {// TODO Auto-generated catch blocke1.printStackTrace();}}}}}}我們用telnet localhost 8080模擬出多個客戶端:
程序運行結果如下:
總結
以上就是本文關於Java NIO服務器端開發詳解的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!