이 기사는 전통적인 바이오에서 NIO, AIO에 이르기까지 얕은 곳에서 깊이까지 소개되며 완전한 코드 설명이 동반됩니다.
예제는 다음 코드에서 사용됩니다. 클라이언트는 방정식 문자열을 서버로 보냅니다. 서버는 계산 후 클라이언트에 결과를 반환합니다.
코드에 대한 모든 지침은 주석으로 직접 사용되며 코드에 포함되어 있으며 코드를 읽을 때 이해하기 쉽습니다. 결과를 계산하기위한 도구 클래스는 코드에서 사용됩니다. 기사의 코드 섹션을 참조하십시오.
관련 기본 지식에 대한 권장 기사 :
Linux 네트워크 I/O 모델 소개 (그림 및 텍스트)
Java 동시성 (멀티 스레딩)
1. 바이오 프로그래밍
1.1. 전통적인 바이오 프로그래밍
네트워크 프로그래밍의 기본 모델은 C/S 모델, 즉 두 프로세스 간의 통신입니다.
서버는 IP 및 청취 포트를 제공합니다. 클라이언트는 서버가 듣고 자하는 연결 작업 주소를 통해 연결 요청을 시작합니다. 3 개의 핸드 셰이크를 통해 연결이 성공적으로 설정되면 양 당사자는 소켓을 통해 통신 할 수 있습니다.
기존 동기화 차단 모델 개발에서 Serversocket은 IP 주소를 바인딩하고 청취 포트를 시작하는 데 책임이 있습니다. 소켓은 연결 작업을 시작합니다. 연결이 성공한 후 양 당사자는 입력 및 출력 스트림을 통해 동기식 차단 통신을 수행합니다.
바이오 서버 통신 모델에 대한 간단한 설명 : 바이오 통신 모델을 사용하는 서버는 일반적으로 클라이언트의 연결을 듣는 독립적 인 수용자 스레드입니다. 클라이언트 연결 요청을 수신 한 후 링크 처리를 위해 각 클라이언트에 대한 새 스레드를 생성하고 처리하지 못한 다음 출력 스트림을 통해 클라이언트에 응답을 반환하면 스레드가 파괴됩니다. 즉, 전형적인 1 회 반복 대자 밤새 모델입니다.
전통적인 바이오 통신 모델 다이어그램 :
이 모델의 가장 큰 문제는 탄성 스케일링 기능이 부족하다는 것입니다. 클라이언트의 동시 액세스 수가 증가하면 서버의 스레드 수는 클라이언트의 동시 액세스 수에 비례합니다. Java의 스레드는 또한 비교적 귀중한 시스템 리소스입니다. 스레드 수가 빠르게 확장되면 시스템의 성능이 급격히 떨어집니다. 액세스 수가 계속 증가함에 따라 시스템은 결국 죽을 것입니다.
동기 차단 I/O에 의해 생성 된 서버 소스 코드 :
패키지 com.anxpp.io.calculator.bio; import java.io.ioexception; import java.net.serversocket; import java.net.socket; /** * 바이오 서버 소스 코드 * @author yangtao_anxpp.com * @version 1.0 */public final class servernormal {// 기본 포트 번호 비공개 정적 int default_port = 12345; // Singleton Serversocket Private STATIC SERVERSOCKE SERVER; // 들어오는 매개 변수에 따라 청취 포트를 설정합니다. 매개 변수가없는 경우 다음 메소드를 호출하고 기본값을 사용하여 public static void start ()가 IoException {// 기본값 시작 (default_port)을 사용하십시오. } //이 메소드는 많은 동시 방식으로 액세스되지 않으며 효율성을 고려할 필요가 없으며 메소드를 직접 공개적으로 동기화 된 정적 void start (int port)를 동기화 할 필요가 없습니다. ioexception {if (server! = null) return; {// 생성자를 통해 서버 소켓 생성 // 포트가 합법적이고 유휴 상태 인 경우 서버는 성공적으로 청취합니다. 서버 = 새 서버 소켓 (포트); System.out.println ( "서버가 시작되었습니다. 포트 번호 :" + 포트); // 무선 루프를 통해 클라이언트 연결을 듣습니다. // 클라이언트 액세스가없는 경우 수락 작업에서 차단됩니다. while (true) {Socket Socket = Server.accept (); // 새 클라이언트 액세스가 있으면 다음 코드가 실행됩니다. 그런 다음이 소켓 링크를 처리 할 새 스레드를 만듭니다. 새 스레드 (새 ServerHandler (Socket)). start (); }} 마침내 {// (서버! = null) {System.out.println ( "서버가 닫혔습니다."); Server.Close (); 서버 = null; }}}} 클라이언트 메시지 처리 스레드 ServerHandler 소스 코드 :
패키지 com.anxpp.io.calculator.bio; import java.io.bufferedReader; import java.io.ioexception; import java.io.inputStreamReader; import java.io.printwriter; import java.net.socket; import com.anxpp.io.utils.calculator; / *** 클라이언트 스레드* @Author Yangtao_anxpp.com* 클라이언트의 소켓 링크*/ public class serverhandler emplements runnable {개인 소켓 소켓; Public ServerHandler (소켓 소켓) {this.socket = 소켓; } @override public void run () {bufferedReader in = null; printwriter out = null; try {in = new bufferedReader (new inputStreamReader (socket.getInputStream ())); out = new printwriter (socket.getOutputStream (), true); 문자열 표현식; 문자열 결과; while (true) {// BufferedReader를 통해 줄을 읽으십시오. // 입력 스트림의 꼬리를 읽은 경우 null을 반환하고 루프를 종료하십시오 // null 값을 얻지 못하면 결과를 계산하고 ((expression = in.readline ()) == null) break; System.out.println ( "서버는" + expression을 받았다); try {result = calculator.cal (expression) .toString (); } catch (Exception e) {result = "calculator.cal (expression) .tostring ();} catch (예외 e) {e.printstacktrace ();} 마침내 {// 필요한 일부 정리 작업 (in! = null) {in.close ();} catch (ioexception (ioxception) {e.printstacktrace (); null) {socket! = null} {ioexception e); 동기 차단 I/O에 의해 생성 된 클라이언트 소스 코드 :
패키지 com.anxpp.io.calculator.bio; import java.io.bufferedReader; import java.io.ioexception; import java.io.inputStreamReader; import java.io.printwriter; import java.net.socket; /** * I/O를 차단하여 생성 된 클라이언트 * @Angtao_anxpp.com * @version 1.0 */public class client {// 기본 포트 번호 개인 정적 int default_server_port = 12345; 개인 정적 문자열 default_server_ip = "127.0.0.1"; public static void send (문자열 표현) {send (default_server_port, expression); } public static void send (int port, string expression) {system.out.println ( "산술 표현식은 :" + 표현); 소켓 소켓 = null; 버퍼드 리더 in = null; printwriter out = null; try {socket = new Socket (default_server_ip, port); in = new bufferedReader (new inputStreamReader (socket.getInputStream ())); out = new printwriter (socket.getOutputStream (), true); out.println (표현); System.out.println ( "___ 결과는 다음과 같습니다." + in.readline ()); } catch (예외 e) {e.printstacktrace (); } 마침내 {// (in! = null) {try {in.close (); } catch (ioexception e) {e.printstacktrace (); } in = null; } if (out! = null) {out.close (); out = null; } if ((null) {try {socket.close (); } catch (ioexception e) {e.printstacktrace (); } socket = null; }}}} 콘솔에서 출력 결과를보기 위해 코드를 테스트하고, 동일한 프로그램 (JVM)에 넣을 수 있도록 다음과 같습니다.
패키지 com.anxpp.io.calculator.bio; import java.io.ioexception; java.util.random import; /** * 테스트 방법 * @Author Yangtao__anxpp.com * @version 1.0 */public class test {// 메인 메소드 테스트 공개 정적 무효 메인 (String [] args)은 인터럽트 exception {// 서버를 실행하여 새로운 스레드 () {@override public void run () {serverbetcect.start (); e.printstacktrace (); start (); // 서버가 시작되기 전에 클라이언트가 코드를 실행하는 것을 피하십시오. // 클라이언트 char 연산자를 실행합니다 [] = { '+', '-', '*', '/'}; random random = new random (System.CurrentTimeMillis ()); new Thread (new Runnable () {@suppresswarnings ( "static-access") @override public void run () {while (true) {// random chertic expression expression = random.nextint (10)+""+연산자 [random.nextint (4)]+(random.nextint (10) +1); client.send (표현); thread.currentthread (). sleep (random.nextint (1000)); }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}. }}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}. } } } } } } } } { 실. currentthread (). sleep (random.nextint (1000)); } } } } } } } } } } } } } } } } } } } } } } } } } } } } } } { 실. currentthread (). sleep (random.nextint (1000)); }}}}}}}}} 달리기 중 하나의 결과 :
서버가 시작되었습니다. 포트 번호 : 12345 산술 표현은 다음과 같습니다. 4-2 서버는 메시지를 받았습니다. 메시지를 받았습니다. 1/6__ 결과는 다음과 같습니다. 0.166666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666 666666666666666666666666666666666666666666666666666666666666666666666666666666
위의 코드에서 바이오의 주요 문제는 새 클라이언트가 액세스를 요청할 때마다 서버 가이 링크를 처리하기 위해 새 스레드를 만들어야하며, 이는 고성능 및 높은 동시성이 필요한 시나리오에서 적용 할 수없는 새로운 스레드를 만들어야한다는 것입니다 (많은 새로운 스레드가 서버 성능에 심각하게 영향을 미치고 심지어 스트라이크에 영향을 미칩니다).
1.2. 유사 아산형 I/O 프로그래밍
이 원 연결-하나의 스레드 모델을 개선하기 위해 스레드 풀을 사용하여 이러한 스레드를 관리 할 수 있습니다 (자세한 내용은 이전에 제공된 기사를 참조하십시오. 이전에 제공된 기사를 참조하십시오). 하나 이상의 스레드에 대한 모델을 N 클라이언트 (기본 레이어는 여전히 동기식 차단 I/O를 사용합니다)는 종종 "pseudo-asynchronous I/O 모델"이라고합니다.
유사 아산형 I/O 모델 다이어그램 :
구현은 매우 간단합니다. 새로운 스레드를 스레드 풀 관리로 넘겨주고 지금은 서버 코드를 변경하면됩니다.
패키지 com.anxpp.io.calculator.bio; import java.io.ioexception; import java.net.serversocket; import java.net.socket; import java.util.concurrent.executorservice; java.util.concurrent.executors import; /** * 바이오 서버 소스 코드 코드 __pseudo-asynchronous i/o * @author yangtao_anxpp.com * @version 1.0 */public final class serverBetter {// 기본 포트 번호 비공개 정적 int default_port = 12345; // Singleton Serversocket Private STATIC SERVERSOCKE SERVER; // 싱글 톤 개인 정적 ExecutorService executorService = executors.newfixedthreadpool (60); // 들어오는 매개 변수에 따라 청취 포트를 설정합니다. 매개 변수가없는 경우 다음 메소드를 호출하고 기본값 public static void start ()를 사용하여 ioexception {// 기본값 시작 (default_port)을 사용하십시오. } //이 방법은 동시에 다수의 액세스하지 않으며 효율성을 고려할 필요가 없으며, 메소드를 직접 공개적으로 동기화 된 정적 void start (int port)를 동기화 할 필요가 없습니다. int port는 ioexception {if (server! = null) return; {// 생성자를 통해 서버 소켓 생성 // 포트가 합법적이고 유휴 상태 인 경우 서버는 성공적으로 청취합니다. 서버 = 새 서버 소켓 (포트); System.out.println ( "서버가 시작되었습니다. 포트 번호 :" + 포트); // 무선 루프를 통한 클라이언트 연결 // 클라이언트 액세스가없는 경우 수락 작업에서 차단됩니다. while (true) {Socket Socket = Server.accept (); // 새 클라이언트 액세스가 있으면 다음 코드가 실행됩니다. 그런 다음 소켓 링크 executorService.Execute (new ServerHandler (Socket))를 처리하기 위해 새 스레드를 만듭니다. }} 마침내 {// (서버! = null) {System.out.println ( "서버가 닫혔습니다."); Server.Close (); 서버 = null; }}}} 테스트 실행 결과는 동일합니다.
캐시드 스레드 풀 스레드 풀 (스레드 수를 제한하지 않으면 명확하지 않은 경우 기사의 시작 부분에서 제공된 기사를 참조하십시오)을 사용하는 경우 실제로 스레드 (재사용)를 자동으로 관리하는 것 외에도 1 : 1 클라이언트 : 스레드 카운트 모델처럼 보입니다. FixedThreadpool을 사용하여 최대 스레드 수를 효과적으로 제어하고 시스템의 제한된 자원을 제어하고 N : M Pseudo-Asynchronous I/O 모델을 구현합니다.
그러나 스레드 수가 제한되어 있으므로 많은 동시 요청이 발생하면 최대 수를 초과하는 스레드는 재사용 할 수있는 스레드 풀에 무료 스레드가있을 때까지 기다릴 수 있습니다. 소켓 입력 스트림을 읽으면 발생할 때까지 차단됩니다.
따라서 읽기 데이터가 느리면 (많은 양의 데이터, 느린 네트워크 전송 등) 많은 양의 동시성이있을 때 다른 액세스 메시지는 항상 기다릴 수 있습니다. 이는 가장 큰 단점입니다.
나중에 소개 될 NIO는이 문제를 해결할 수 있습니다.
2. NIO 프로그래밍
새로운 Java I/O 라이브러리는 속도를 높이기 위해 JDK 1.4의 Java.nio.* 패키지에 도입됩니다. 실제로, "오래된"I/O 패키지는 NIO를 사용하여 다시 구현되었으며, NIO 프로그래밍을 명시 적으로 사용하지 않더라도 이익을 얻을 수 있습니다. 파일 I/O와 네트워크 I/O에서 속도 개선이 발생할 수 있지만이 기사에서는 후자에 대해서만 설명합니다.
2.1. 소개
우리는 일반적으로 NIO를 새로운 I/O (공식 이름)로 생각합니다. 왜냐하면 그것은 이전 I/O 라이브러리에 새롭기 때문입니다 (실제로 JDK 1.4에 소개되었지만이 명사는 오랫동안 사용될 것이므로 지금은 "오래된"경우에도 계속 사용될 때도 계속 고려해야한다는 것을 상기시켜줍니다). 그러나 많은 사람들이 비 블록 I/O라고 불립니다. 다음 텍스트의 NIO는 완전히 새로운 I/O 라이브러리를 참조하지 않지만 I/O를 차단하지는 않습니다.
NIO는 기존 바이오 모델의 소켓 및 서버 소켓에 해당하는 Socketchannel 및 Serversocketchannel의 두 가지 소켓 채널 구현을 제공합니다.
새로 추가 된 채널 모두 블록 및 비 차단 모드를 지원합니다.
차단 모드의 사용은 전통적인 지원만큼 간단하지만 성능과 신뢰성은 좋지 않습니다. 비 블로킹 모드는 정확히 반대입니다.
저재량이 적은 저속 적용 응용 프로그램의 경우 동기 차단 I/O를 사용하여 개발 속도를 향상시키고 유지 보수를 개선 할 수 있습니다. 고 부하, 고음성 (네트워크) 애플리케이션의 경우 NIO의 비 블로킹 모드를 사용해야합니다.
기본 지식은 아래에서 먼저 소개됩니다.
2.2. 버퍼 버퍼
버퍼는 작성하거나 읽을 데이터가 포함 된 객체입니다.
NIO 라이브러리에서 모든 데이터는 버퍼로 처리됩니다. 데이터를 읽을 때 버퍼로 직접 읽습니다. 데이터를 작성할 때 버퍼에 기록됩니다. NIO에서 데이터에 액세스 할 때마다 버퍼를 통해 작동됩니다.
버퍼는 실제로 배열이며 데이터에 대한 구조화 된 액세스 및 읽기 및 쓰기 위치 유지와 같은 정보를 제공합니다.
특정 캐시 영역은 바이트 버프, Charbuffer, Shortbuffer, Intbuffer, Longbuffer, Floatbuffer, Doublebuffer입니다. 동일한 인터페이스를 구현합니다. 버퍼.
2.3. 채널
데이터의 읽기 및 쓰기는 수로, 수로와 같은 채널을 통해 전달되어야합니다. 채널과 스트림의 차이점은 채널이 양방향이며 읽기, 쓰기 및 동시 읽기 및 쓰기 작업에 사용할 수 있다는 것입니다.
기본 운영 체제의 채널은 일반적으로 전이중이므로 전체 이중 채널은 스트림보다 기본 운영 체제의 API를 더 잘 매핑 할 수 있습니다.
채널은 주로 두 가지 범주로 나뉩니다.
다음 코드에 관여 할 서버 Socketchannel 및 Socketchannel은 SelectableChannel의 서브 클래스입니다.
2.4. 멀티플렉서 선택기
선택기는 Java Nio 프로그래밍의 기초입니다.
선택기는 준비된 작업을 선택할 수있는 기능을 제공합니다. 선택기는 등록 된 채널을 지속적으로 투표합니다. 채널에서 읽기 또는 쓰기 이벤트가 발생하면 채널은 준비 상태에 있으며 선택기가 투표합니다. 그런 다음 선택 키를 통해 준비된 채널 세트를 얻을 수있어 후속 I/O 작업을 수행 할 수 있습니다.
JDK는 기존 선택 구현 대신 epoll ()을 사용하기 때문에 선택기는 동시에 여러 채널을 폴링 할 수 있습니다. 최대 연결 핸들 1024/2048에는 제한이 없습니다. 따라서 선택자의 폴링에 대한 책임이있는 스레드 만 있으면 수천 명의 클라이언트에 액세스 할 수 있습니다.
2.5. NIO 서버
이 코드는 기존 소켓 프로그래밍보다 훨씬 더 복잡해 보입니다.
코드를 붙여 넣고 코드 설명을 주석 형태로 제공하십시오.
NIO가 생성 한 서버 소스 코드 :
패키지 com.anxpp.io.calculator.nio; 공개 클래스 서버 {private static int default_port = 12345; 개인 정적 서버 핸들 서버 핸들; public static void start () {start (default_port); } public static synchronized void start (int port) {if (serverHandle! = null) serverHandle.stop (); ServerHandle = New ServerHandle (포트); 새 스레드 (ServerHandle, "Server"). start (); } public static void main (String [] args) {start (); }} ServerHandle :
패키지 com.anxpp.io.calculator.nio; import java.io.ioexception; import java.net.inetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.iterator; java.util.set import; import com.anxpp.io.utils.calculator; / ** * nio server * @author Yangtao_anxpp.com * @version 1.0 */ public class serverhandle emplements runnable {private selector selector; Private Serversocketchannel ServerChannel; 개인 휘발성 부울 시작; /*** 생성자* @Param 포트 청취 할 포트 번호를 지정*/public serverHandle (int port) {try {// selector = selector.open (); // 청취 채널을 엽니 다 // 사실이라면이 채널은 차단 모드에 배치됩니다. False 인 경우이 채널은 비 차단 모드 ServerChannel.configureBlocking (false)에 배치됩니다. // 비 블로킹 모드 활성화 // 바인드 포트 백 로그는 1024 ServerChannel.socket ()로 설정됩니다 (new inetSocketAddress (port), 1024); // SuperCE Client Connection Request ServerChannel.register (selector, selectionKey.op_accept); // 서버가 활성화되어 시작 = true; System.out.println ( "서버가 시작되었습니다. 포트 번호 :" + 포트); } catch (ioexception e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @override public void run () {// 선택기를 통한 루프 (시작) {try {// 읽기 및 쓰기 이벤트가 있든, 선택기는 1s selector.select (1000); // 차단하면 적어도 하나의 등록 된 이벤트가 발생하는 경우에만 계속됩니다. // selector.select (); <selectionkey> keys = selector.selectedKeys (); 반복자 <selectionkey> it = keys.iterator (); selectionkey key = null; while (it.hasnext ()) {key = it.next (); it.remove (); try {handleInput (키); } catch (예외 e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). cose (); }}}}}}}}}} catch (Throwable t) {t.printstacktrace (); }} // 선택기가 닫힌 후 관리되는 리소스는 (selector! = null) try {selector.close (); } catch (예외 e) {e.printstacktrace (); }} private void handinput (selectionKey Key)는 ioException을 던졌습니다. ioException {if (key.isvalid ()) {// 새 액세스에 대한 요청 메시지를 처리합니다. if (key.isacceptable ()) {serversocketchannel ssc = (serversocketchannel) key.channel (); // serversocketchannel의 수락을 통해 Socketchannel 인스턴스 생성 //이 작업은 TCP 3 방향 핸드 셰이크를 완료하는 것을 의미하며 TCP 물리적 링크가 공식적으로 설정되었습니다. Socketchannel sc = ssc.accept (); // 비 블로킹으로 설정 sc.configureBlocking (false); // 읽기 sc.register (selector, selectionKey.Op_read)로 등록; } // 메시지를 읽습니다. if (key.isreadable ()) {socketchannel sc = (socketchannel) key.channel (); // 바이트 버퍼를 생성하고 1m 버퍼 바이트 버퍼 버퍼를 엽니 다. // 요청 스트림을 읽고 바이트 수를 반환합니다 int readBytes = sc.read (buffer); // 바이트를 읽고 바이트를 코딩하고 (readBytes> 0) {// 버퍼의 현재 한계를 위치 = 0으로 설정합니다. // 버퍼 수를 기준으로 바이트 배열을 만듭니다. 읽기 가능한 바이트 바이트 = 새 바이트 [buffer.remaining ()]; // 버퍼 읽기 가능한 바이트 배열을 새로 생성 된 배열 버퍼에 복사합니다. 문자열 expression = 새 문자열 (바이트, "UTF-8"); System.out.println ( "서버는" + 표현식을 받았습니다); // 데이터 처리 문자열 result = null; try {result = calculator.cal (expression) .toString (); } catch (예외 e) {result = "계산 오류 :" + e.getMessage (); } // 답장 메시지 보내기 dowrite (sc, result); } // 바이트 없음 읽기 및 무시 // else if (readBytes == 0); // 링크가 닫혀서 리소스를 자유롭게하면 else if (readBytes <0) {key.cancel (); sc.close (); }}}}} // 응답 메시지 보내기 메시지를 보냅니다. // 배열 용량에 따라 바이트 버퍼 생성 바이트 버퍼 writebuffer = bytebuffer.allocate (bytes.length); // 바이트 배열을 버퍼에 복사하여 writeBuffer.put (bytes); // 플립 작업 writeBuffer.flip (); // 버퍼 채널의 바이트 배열을 보내십시오 .Write (WriteBuffer); // ***** "Half-Packet 쓰기"처리 코드는 여기에 포함되지 않습니다}}보시다시피, NIO 서버를 만드는 주요 단계는 다음과 같습니다.
응답 메시지가 전송되기 때문에 Socketchannel도 비동기식 및 비 블로킹이므로 전송 해야하는 데이터를 한 번에 전송할 수 있으며 현재 반 패킷을 작성하는 데 문제가 있습니다. 우리는 쓰기 작업을 등록하고, 선택기를 지속적으로 설문 조사하여 Unsent 메시지를 보내고, 버퍼의 hasremain () 메소드를 사용하여 메시지가 전송되는지 여부를 결정해야합니다.
2.6. NIO 클라이언트
코드를 업로드하는 것이 좋습니다. 프로세스에 너무 많은 설명이 필요하지 않으며 서버 코드와 약간 유사합니다.
고객:
패키지 com.anxpp.io.calculator.nio; 공개 클래스 클라이언트 {private static string default_host = "127.0.0.1"; private static int default_port = 12345; 개인 정적 클라이언트 핸들 클라이언트 핸들; public static void start () {start (default_host, default_port); } public static synchronized void start (string ip, int port) {if (clientHandle! = null) clientHandle.stop (); clientHandle = new ClientHandle (IP, 포트); 새 스레드 (ClientHandle, "Server"). start (); } // 서버로 메시지를 보내십시오 public static boolean sendmsg (문자열 msg)는 예외를 던지려면 {if (msg.equals ( "q")) return false; clientHandle.sendmsg (msg); 진실을 반환하십시오. } public static void main (String [] args) {start (); }} ClientHandle :
패키지 com.anxpp.io.calculator.nio; import java.io.ioexception; import java.net.inetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.socketchannel; import java.util.iterator; java.util.set import; / ** * nio client * @author yangtao_anxpp.com * @version 1.0 */ public class clientHandle emplements runnable {private string host; 개인 int 포트; 개인 선택기 선택기; 전용 양말 채널 양말; 개인 휘발성 부울 시작; Public ClientHandle (String IP, Int Port) {this.host = ip; this.port = 포트; try {// selector selector 만들기 = selector.open (); // 청취 채널을 엽니 다. // 사실이라면이 채널은 차단 모드에 배치됩니다. False 인 경우이 채널은 비 차단 모드에 배치됩니다. Booketchannel.configureBlocking (False); // 개방 비 블로킹 모드 시작 = true; } catch (ioexception e) {e.printstacktrace (); System.exit (1); }} public void stop () {start = false; } @override public void run () {try {doconnect (); } catch (ioexception e) {e.printstacktrace (); System.exit (1); } // 선택기를 통한 (시작) {try {// 읽기 및 쓰기 이벤트가 있는지 여부에 관계없이 선택기는 각각 1s selector.select (1000); // 차단하면 적어도 하나의 등록 된 이벤트가 발생하는 경우에만 계속됩니다. // selector.select (); <selectionkey> keys = selector.selectedKeys (); 반복자 <selectionkey> it = keys.iterator (); selectionkey key = null; while (it.hasnext ()) {key = it.next (); it.remove (); try {handleInput (키); } catch (예외 e) {if (key! = null) {key.cancel (); if (key.channel ()! = null) {key.channel (). cose (); }}}}}}}}} catch (예외 e) {e.printstacktrace (); System.exit (1); }} // 선택기가 닫힌 후 관리되는 리소스는 (selector! = null) try {selector.close (); } catch (예외 e) {e.printstacktrace (); }} private void handinput (selectionKey 키)는 ioException {if (key.isvalid ()) {socketchannel sc = (socketchannel) key.channel (); if (key.isconnectable ()) {if (sc.finishConnect ()); else system.exit (1); } // if (key.isreadable ()) {// 바이 테버퍼를 만들고 1m 버퍼 바이트 버퍼 버퍼를 엽니 다. // 요청 코드 스트림을 읽고 바이트 숫자를 반환합니다. // 바이트를 읽고 바이트를 코딩하고 (readBytes> 0) {// 버퍼의 현재 한계를 위치 = 0으로 설정합니다. // 버퍼 바이트의 읽기 가능한 바이트 수를 기반으로 바이트 배열을 만듭니다 [] bytes = new Byte [buffer.remaining ()]; // 버퍼 읽기 가능한 바이트 배열을 새로 생성 된 배열 버퍼에 복사합니다. 문자열 결과 = 새 문자열 (바이트, "UTF-8"); System.out.println ( "클라이언트가 메시지를 받았습니다 :" + result); } // bytes 읽기가 무시되지 않습니다. // if (readBytes == 0); // 링크가 닫혀서 리소스를 자유롭게하면 else if (readBytes <0) {key.cancel (); sc.close (); }}}}} // 메시지 보내기 메시지 보내기 비동기 비동기 개인 무효 DOWRITE (socketchannel 채널, 문자열 요청) IoException {// 메시지를 바이트 배열 바이트 [] bytes = request.getBytes ()로 인코딩합니다. // 배열 용량을 기반으로 바이트 버퍼 생성 BYTEBUFFER writeBuffer = byteBuffer.Allocate (bytes.length); // 바이트 배열을 버퍼에 복사하는 writeBuffer.put (bytes); // 플립 작업 writeBuffer.flip (); // 바이트 배열 채널을 보내십시오 .Write (WriteBuffer); // ***** "쓰기 하프 패킷"을 처리하기위한 코드는 여기에 포함되지 않습니다} private void doconnect ()는 ioException {if (socketchannel.connect (new inetSocketAddress (host, port))); else socketchannel.register (selector, selectionkey.op_connect); } public void sendmsg (String msg)는 예외를 {socketchannel.register (selector, selectionkey.op_read); Dowrite (Socketchannel, MSG); }} 2.7. 데모 결과
먼저 서버를 실행하고 클라이언트를 실행하십시오.
패키지 com.anxpp.io.calculator.nio; java.util.scanner import; /** * 테스트 방법 * @Author Yangtao__anxpp.com * @version 1.0 */public class test {// 메인 메소드 테스트 @suppresswarnings ( "resource") public static void main (string [] args)은 예외를 던졌습니다 {// run server.start (); // 코드를 실행하는 클라이언트를 피하십시오 .Sleep (100); // client.Start ()를 실행합니다. while (client.sendmsg (새 스캐너 (system.in) .nextline ()); }} 우리는 또한 클라이언트를 별도로 실행할 수 있으며 효과는 동일합니다.
시험 결과 :
서버가 시작되었습니다. 포트 번호 : 123451+2+3+4+5+6 서버는 메시지를 받았습니다. 1+2+3+4+5+6 클라이언트는 메시지를 받았습니다. 211*2/3-4+5*6/7-8 서버는 메시지를 받았습니다.
여러 클라이언트를 실행하는 데 아무런 문제가 없습니다.
3. Aio Programming
NIO 2.0은 새로운 비동기 채널의 개념을 소개하고 비동기 파일 채널 및 비동기 소켓 채널의 구현을 제공합니다.
비동기 소켓 채널은 UNIX 네트워크 프로그래밍에서 이벤트 중심 I/O (AIIO)에 해당하는 진정한 비동기 비 차단 I/O입니다. 비동기식 읽기 및 쓰기를 달성하기 위해 등록 된 채널을 폴링하기 위해 너무 많은 선택기가 필요하지 않으므로 NIO 프로그래밍 모델을 단순화합니다.
코드를 업로드하십시오.
3.1. 서버 사이드 코드
섬기는 사람:
패키지 com.anxpp.io.calculator.aio.server; / ** * aio server * @author yangtao_anxpp.com * @version 1.0 */ public class server {private static int default_port = 12345; 개인 정적 AsyncServerHandler ServerHandle; 공개 휘발성 정적 긴 ClientCount = 0; public static void start () {start (default_port); } public static synchronized void start (int port) {if (serverhandle! = null) return; ServerHandle = 새로운 AsyncServerHandler (포트); 새 스레드 (ServerHandle, "Server"). start (); } public static void main (String [] args) {server.start (); }} Asyncserverhandler :
패키지 com.anxpp.io.calculator.aio.server; import java.io.ioexception; import java.net.inetSocketAddress; import java.nio.channels.asyncserversocketchannel; java.util.concurrent.countdownlatch import; 공개 클래스 AsyncServerHandler는 실행 가능한 {Public CountdownLatch Latch; 공개 비동기 Serversocketchannel 채널; public asyncserverhandler (int port) {try {// 서버 만들기 채널 작성 = asynchronousserversocketchannel.open (); // 바인드 포트 채널. System.out.println ( "서버가 시작되었습니다. 포트 번호 :" + 포트); } catch (ioexception e) {e.printstacktrace (); }} @override public void run () {// countdownlatch 초기화 // 함수 : 여기에서 실행중인 작업 세트를 완료하기 전에 현재 필드가 항상 차단되도록 허용합니다. 여기에서 실행 후 서버가 종료되는 것을 방지하기 위해 필드 블록이 여기에서 사용할 수 있도록 할 수 있습니다. // Connection Channel.cept (this, new accepthandler ()); try {latch.await (); } catch (InterruptedException e) {e.printstacktrace (); }}} accepthandler :
패키지 com.anxpp.io.calculator.aio.server; import java.nio.bytebuffer; import java.nio.channels.asynchronoussocketchannel; java.nio.channels.completionHandler 가져 오기; // 핸들러로 연결 공개 클래스 AccepThandler는 완료 핸들러를 구현합니다. <AsynchronousSocketchannel, AsyncServerHandler> {@override public void 완료 (asynchronoussocketchetchannel 채널, asyncserverhandler serverhandler) {// 다른 클라이언트에서 요청을 계속 받아들이십시오 .ClientCount ++; System.out.println("Number of connected clients:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //Create a new Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the service that receives message callbacks Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); }} ReadHandler:
package com.anxpp.io.calculator.aio.server; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import com.anxpp.io.utils.Calculator; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //Used to read semi-packet messages and send answers private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } // Processing after reading the message @Override public void completed(Integer result, ByteBuffer attachment) { //flip operation attachment.flip(); //According to byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("The server received a message: " + expression); String calrResult = null; try{ calrResult = Calculator.cal(expression).toString(); } catch(Exception e){ calrResult = "Calculator error: " + e.getMessage(); } //Send a message doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //Send a message private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //Asynchronous write data parameters are the same as the previous read channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //If it is not sent, continue to send until it is completed (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //Create a new Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //Asynchronously read the third parameter for the business that receives message callbacks. Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (ioexception e) {e.printstacktrace (); }}} OK,这样就已经完成了,其实说起来也简单,虽然代码感觉很多,但是API比NIO的使用起来真的简单多了,主要就是监听、读、写等各种CompletionHandler。此处本应有一个WriteHandler的,确实,我们在ReadHandler中,以一个匿名内部类实现了它。
下面看客户端代码。
3.2、Client端代码
고객:
package com.anxpp.io.calculator.aio.client; import java.util.Scanner; public class Client { private static String DEFAULT_HOST = "127.0.0.1"; private static int DEFAULT_PORT = 12345; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //Send a message to the server public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); 진실을 반환하십시오. } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }} AsyncClientHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsyncronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; 개인 int 포트; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //Create an asynchronous client channel clientChannel = AsynchronousSocketChannel.open(); } catch (ioexception e) {e.printstacktrace (); } } @Override public void run() { //Create CountDownLatch and wait latch = new CountDownLatch(1); //Initiate an asynchronous connection operation, the callback parameter is this class itself. If the connection is successful, the completed method clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (ioexception e) {e.printstacktrace (); } } //Connect the server successfully// means TCP completes three handshakes @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("Client connection to the server..."); } //Connecting to the server failed @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("Connecting to the server failed..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (ioexception e) {e.printstacktrace (); } } //Send a message to the server public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //Asynchronously write clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); }} WriteHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部数据的写入if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //读取数据ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("数据发送失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } ReadHandler:
package com.anxpp.io.calculator.aio.client; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客户端收到结果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("数据读取失败..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }这个API使用起来真的是很顺手。
3.3. 시험
시험:
package com.anxpp.io.calculator.aio; import java.util.Scanner; import com.anxpp.io.calculator.aio.client.Client; import com.anxpp.io.calculator.aio.server.Server; /** * Test method* @author yangtao__anxpp.com * @version 1.0 */ public class Test { //Test main method @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ //Run server Server.start(); //Avoid the client executing the code Thread.sleep(100); //Run client Client.start(); System.out.println("Please enter the request message:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); }}我们可以在控制台输入我们需要计算的算数字符串,服务器就会返回结果,当然,我们也可以运行大量的客户端,都是没有问题的,以为此处设计为单例客户端,所以也就没有演示大量客户端并发。
读者可以自己修改Client类,然后开辟大量线程,并使用构造方法创建很多的客户端测试。
下面是其中一次参数的输出:
服务器已启动,端口号:12345请输入请求消息:客户端成功连接到服务器...连接的客户端数:1123456+789+456服务器收到消息: 123456+789+456客户端收到结果:1247019526*56服务器收到消息: 9526*56客户端收到结果:533456...
AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。
下面就比较一下,几种I/O编程的优缺点。
4、各种I/O的对比
先以一张表来直观的对比一下:
具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。
5、附录
上文中服务端使用到的用于计算的工具类:
package com.anxpp.utils;import javax.script.ScriptEngine;import javax.script.ScriptEngineManager;import javax.script.ScriptException;public final class Calculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException{ return jse.eval(expression); }}위는이 기사의 모든 내용입니다. 모든 사람의 학습에 도움이되기를 바랍니다. 모든 사람이 wulin.com을 더 지원하기를 바랍니다.