머리말:
최근에 Hadoop의 RPC (원격 프로 시저 호출 프로토콜)를 분석 할 때 기본 네트워크 기술을 이해하지 않고 네트워크를 통해 원격 컴퓨터 프로그램의 서비스를 요청하는 프로토콜입니다. http://baike.baidu.com/view/32726.htm) 메커니즘을 참조 할 수 있습니다. Hadoop의 RPC 메커니즘 구현은 주로 두 가지 기술을 사용하는 것으로 나타났습니다. 동적 프록시 (블로그를 참조 할 수 있습니다 : http://weixiaolu.iteye.com/147774) 및 Java nio. Hadoop의 RPC 소스 코드를 올바르게 분석하려면 Java Nio의 원칙과 구체적인 구현을 먼저 연구해야한다고 생각합니다.
이 블로그에서는 주로 두 방향에서 Java Nio를 분석합니다.
목차 :
하나. Java Nio와 차단 I/O의 차이
1. I/O 통신 모델 차단
2. Java NIO 원칙 및 통신 모델 2. Java NIO 서버 및 클라이언트 코드 구현
특정 분석 :
하나. Java Nio와 차단 I/O의 차이
1. I/O 통신 모델 차단
지금 I/O 차단에 대한 특정 이해가있는 경우 InputStream.Read () 메소드를 호출 할 때 I/O 차단이 차단된다는 것을 알고 있습니다. 반환하기 전에 데이터가 도착하거나 타임 아웃 할 때까지 기다립니다. 마찬가지로 serversocket.accrect () 메소드를 호출 할 때 반환하기 전에 클라이언트 연결이있을 때까지 차단됩니다. 각 클라이언트가 연결되면 서버는 클라이언트의 요청을 처리하기 위해 스레드를 시작합니다. I/O 차단의 통신 모델 다이어그램은 다음과 같습니다.
신중하게 분석하면 I/O 차단의 단점이 있음을 분명히 알 수 있습니다. 차단 I/O 통신 모델을 기반으로 두 가지 단점을 요약했습니다.
1. 클라이언트가 많으면 많은 처리 스레드가 생성됩니다. 그리고 각 스레드는 스택 공간과 CPU 시간을 차지합니다.
2. 차단은 컨텍스트 전환이 빈번한 것으로 이어질 수 있으며 대부분의 컨텍스트 전환은 의미가 없을 수 있습니다.
이 경우 비 블로킹 I/O에는 응용 프로그램 전망이 있습니다.
2. Java Nio 원칙 및 커뮤니케이션 모델
Java Nio는 JDK1.4에서 시작되었으며 "새로운 I/O"또는 비 차단 I/O라고 할 수 있습니다. Java Nio의 작동 방식은 다음과 같습니다.
1. 전용 스레드는 모든 IO 이벤트를 처리하고 배포를 담당합니다.
2. 이벤트 중심 메커니즘 : 이벤트를 동시에 모니터링하지 않고 이벤트가 도착하면 트리거됩니다.
3. 스레드 커뮤니케이션 : 스레드는 대기, 알림 및 기타 수단을 통해 통신합니다. 모든 컨텍스트 스위치가 의미가 있는지 확인하십시오. 불필요한 스레드 스위칭을 줄입니다.
정보를 읽은 후, 나는 이해하는 Java Nio의 작동 회로도를 게시했습니다.
(참고 : 각 스레드의 처리 흐름은 아마도 데이터, 디코딩, 컴퓨팅 처리, 인코딩 및 응답 보내기 일 것입니다.)
Java Nio 서버는 모든 IO 이벤트를 처리하기 위해 특수 스레드 만 시작하면됩니다. 이 통신 모델은 어떻게 구현됩니까? 하하, 그 미스터리를 함께 탐색합시다. Java Nio는 단방향 스트림이 아닌 데이터 전송을 위해 양방향 채널을 사용하며 관심있는 이벤트는 채널에 등록 될 수 있습니다. 총 4 개의 이벤트가 있습니다.
| 이벤트 이름 | 해당 값 |
| 서버는 클라이언트 연결 이벤트를 수신합니다 | selectionkey.op_accept (16) |
| 클라이언트 연결 서버 이벤트 | selectionkey.op_connect (8) |
| 이벤트를 읽습니다 | selectionkey.op_read (1) |
| 이벤트 작성 | selectionkey.op_write (4) |
서버와 클라이언트는 각각 하나 이상의 채널에서 이벤트를 감지 할 수있는 선택기를 호출하는 채널을 관리하는 객체를 유지 관리합니다. 서버를 예로 들어 봅시다. 읽기 이벤트가 서버 선택기에 등록되면 클라이언트는 어느 시점에서 일부 데이터를 서버로 보냅니다. I/O를 차단할 때는 read () 메소드를 호출하여 데이터를 차단하고 NIO 서버는 선택기에 읽기 이벤트를 추가합니다. 서버의 처리 스레드는 셀렉터에 액세스하기 위해 설문 조사합니다. 선택기에 액세스 할 때 관심있는 이벤트가 도착하는 경우 이러한 이벤트를 처리합니다. 이벤트 이벤트가 도착하지 않으면 관심 이벤트가 도착할 때까지 처리 스레드가 차단됩니다. 아래는 내가 이해하는 Java Nio의 통신 모델의 개략도입니다.
둘. Java NIO 서버 및 클라이언트 코드 구현
Java Nio를 더 잘 이해하기 위해 다음은 서버 및 클라이언트를위한 간단한 코드 구현입니다.
섬기는 사람:
패키지 cn.nio; import java.io.ioexception; import java.net.inetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.iterator; /*** nio server* @author small path*/public class nioserver {// 채널 관리자 개인 선택기 선택기; / ** * Serversocket 채널을 받고 채널에서 일부 초기화 작업을 수행합니다 * @param port bound port * @throws ioexception */ public void initserver (int port)는 ioexception {// serverSocket 채널 serversocketchetchannel serverChannel을 얻습니다. // 채널을 비 블로킹 ServerChannel.configureBlocking (false)으로 설정합니다. //이 채널에 해당하는 서버 소켓 바인드 포트 포트 ServerChannel.socket (). bind (new inetSocketAddress (port)); // 채널 관리자를 얻습니다. this.selector = selector.open (); // 채널 관리자를 채널에 바인딩하고 채널의 selectionKey.op_accept 이벤트를 등록하십시오. 이벤트를 등록한 후 // 이벤트가 도착하면 selector.select ()가 반환됩니다. 이벤트가 셀렉터에 도달하지 않으면 select ()가 차단됩니다. ServerChannel.register (selector, selectionKey.op_accept); } /*** 폴링을 사용하여 선택기에 처리 해야하는 이벤트가 있는지 확인하십시오. 그렇다면 * @throws ioexception */ @suppresswarnings ( "선택 취소") public void listen ()가 ioexception {system.out.println ( "서버 측 시작 시작!")을 처리합니다. // (true) {// 등록 된 이벤트가 도착하면 메소드가 반환됩니다. 그렇지 않으면 메소드가 계속 차단합니다. selector.select (); // 선택기에서 선택한 항목의 반복자를 가져 오면 선택한 항목은 등록 된 이벤트 반복자 항목 = this.selector.Selector.SelectedKeys (). iterator (); while (ite.hasnext ()) {selectionkey key = (selectionKey) ite.next (); // item.remove ()의 반복 처리를 방지하기 위해 선택한 키를 삭제합니다. // 클라이언트는 (key.isacceptable ()) {serversocketchannel server = (serversocketchannel) key .Channel (); // 클라이언트에 연결할 채널을 얻습니다. socketchannel 채널 = Server.Accept (); // 비 블로킹 채널로 설정 .ConfigureBlocking (false); // 여기에 클라이언트에 정보를 보낼 수 있습니다. channel.write (bytebuffer.wrap (새 문자열 ( "클라이언트에게 메시지 보내기"). getBytes ()); // 클라이언트와의 연결이 성공하면 클라이언트의 정보를 받으려면 채널에 대한 읽기 권한을 설정해야합니다. channel.register (this.selector, selectionkey.op_read); // 읽은 이벤트가 얻어졌습니다} else if (key.isReadable ()) {read (key); }}}} / ** * 클라이언트가 보낸 메시지를 읽는 처리 이벤트 * @param key * @throws ioException * / public void read (selectionKey Key) IOException {// 서버가 읽을 수 있습니다 : 이벤트가 발생하는 소켓 채널을 가져옵니다. Socketchannel Channel = (Socketchannel) key.channel (); // 읽기 버퍼 바이트 버퍼 작성 버퍼 = bytebuffer.allocate (10); 채널. 읽기 (버퍼); 바이트 [] data = buffer.array (); 문자열 msg = new String (data) .trim (); System.out.println ( "서버는"+msg); 바이트 버퍼 outbuffer = bytebuffer.wrap (msg.getBytes ()); Chann Server.initserver (8000); Server.Listen (); }} 고객:
패키지 cn.nio; import java.io.ioexception; import java.net.inetSocketAddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.socketchannel; import java.util.iterator; /*** nio 클라이언트* @Author Small Path*/public class nioclient {// 채널 관리자 개인 선택기 선택기; / ** * 소켓 채널을 가져 와서 채널의 초기화 * @Param IP * @param 포트에 연결된 서버의 IP * @Throws ioException */ public void initclient (String IP, int Port)에 연결된 서버의 포트 번호 ioexception {// 소켓 채널 채널 채널 채널 = Socketchannal.open (); // 채널을 비 블로킹 채널로 설정합니다 .ConfigureBlocking (false); // 채널 관리자를 얻습니다. this.selector = selector.open (); // 클라이언트가 서버에 연결합니다. 실제로 메소드 실행은 연결을 구현하지 않습니다. // channel.finishConnect ()를 사용해야합니다. Connection Channel.connect를 완성하기위한 Listen () 메소드에서 (New InetSocketAddress (IP, Port)); // 채널 관리자를 채널에 바인딩하고 채널의 selectionKey.op_connect 이벤트를 등록하십시오. channel.register (selector, selectionKey.op_connect); } /*** 폴링을 사용하여 선택기에 처리 해야하는 이벤트가 있는지 확인하십시오. 그렇다면 * @throws ioexception */ @suppresswarnings ( "확인되지 않은") public void listen ()는 ioexception {// selector에 액세스하기 위해 폴링 (true) {selector.select (); // 선택기 반복자 iterator iterator iter = this.selector.selectedKeys (). iterator (); while (ite.hasnext ()) {selectionkey key = (selectionKey) item.next (); // item.remove ()의 반복 처리를 방지하기 위해 선택한 키를 삭제합니다. // 연결 이벤트는 (key.iscOnconCnectable ()) {socketchannel 채널 = (Socketchannel) key .Channel (); // 연결이 연결된 경우 (channel.isconnectionPending ()) {channer.finishConnect (); } // 비 블로킹 채널로 설정 .ConfigureBlocking (false); // 서버 채널에 정보를 보낼 수 있습니다 (bytebuffer.wrap (새 문자열 ( "서버로 메시지 보내기"). getBytes ()); // 서버와의 연결이 성공한 후 서버 정보를 수신하기 위해서는 채널을 읽기로 설정해야합니다. channel.register (this.selector, selectionkey.op_read); // 읽은 이벤트가 얻어졌습니다} else if (key.isReadable ()) {read (key); }}}} /*** 서버에서 보낸 메시지를 읽는 처리 이벤트* @param key* @throws ioexception* /public void read (selectionkey key)는 ioexception {// 서버의 읽기 메소드와 동일 {// start test* @throws ioexception* /public static void main (strings) ioexemention {nioexemention {niioxemention {new exmention void [] args). nioclient (); client.initclient ( "localhost", 8000); client.listen (); }}요약:
마지막으로, 동적 프록시 및 Java NIO 분석이 완료되었습니다. 하하, 다음은 Hadoop의 RPC 메커니즘의 소스 코드를 분석하는 것입니다. 블로그 주소는 http://weixiaolu.itey.com/blog/1504898입니다. 그러나 Java Nio에 대한 이해에 반대 의견이 있으면 함께 토론 할 수 있습니다.
재 인쇄가 필요한 경우 소스를 표시하십시오 : http://weixiaolu.itey.com/blog/1479656