RPC 또는 원격 절차 호출은 간단히 말하기위한 것입니다. 원격 컴퓨터에서 서비스를 호출하는 것은 로컬 서비스에 전화하는 것과 같습니다.
RPC는 HTTP 또는 TCP 프로토콜을 기반으로 할 수 있습니다. 웹 서비스는 HTTP 프로토콜을 기반으로하는 RPC입니다. 크로스 플랫폼 성능이 우수하지만 성능은 TCP 프로토콜을 기반으로 RPC만큼 좋지 않습니다. 두 가지 측면은 RPC의 성능에 직접적인 영향을 미칩니다. 하나는 전송 방법이고, 다른 하나는 직렬화입니다.
우리 모두 알다시피, TCP는 전송 계층 프로토콜, HTTP는 애플리케이션 계층 프로토콜이며 전송 계층은 응용 프로그램 계층 아래에 있습니다. 데이터 전송 측면에서 하위 계층이 더 빠릅니다. 따라서 일반적으로 TCP는 HTTP보다 빠릅니다. 직렬화와 관련하여 Java는 기본 직렬화 방법을 제공하지만 동시성이 높은 경우이 방법은 성능 병목 현상을 가져 오기 때문에 Protobuf, Kryo, Hessian, Jackson 등과 같은 일련의 우수한 직렬화 프레임 워크가 시장에서 등장했습니다.
높은 동시성을 지원하기 위해 전통적인 차단 IO는 분명히 적합하지 않으므로 비동기 IO, 즉 NIO가 필요합니다. Java는 NIO 솔루션을 제공하며 Java 7은 더 나은 NIO.2 지원을 제공합니다. Java로 NIO를 구현하는 것은 먼 것이 아니지만 NIO의 기술적 세부 사항에 익숙해야합니다.
분산 환경에서 다양한 노드에 서비스를 배포해야하며 서비스 등록을 통해 클라이언트는 현재 사용 가능한 서비스를 자동으로 발견하고 이러한 서비스를 호출 할 수 있습니다. 이를 위해서는 분산 환경에 모든 서비스 주소 (호스트 이름 및 포트 번호 포함)를 등록하려면 서비스 레지스트리 구성 요소가 필요합니다.
응용 프로그램, 서비스 및 서비스 레지스트리의 관계는 다음과 같습니다.
각 서버에서 여러 서비스를 게시 할 수 있습니다. 이 서비스는 호스트와 포트를 공유합니다. 분산 환경에서는 서버가 공동으로 서비스를 제공 할 수 있도록 제공됩니다. 또한 서비스 레지스트리의 단일 실패 지점을 방지하려면 클러스터 환경에 내장해야합니다.
이 기사는 경량 분산 RPC 프레임 워크를 개발하는 특정 프로세스를 보여줍니다. 이 프레임 워크는 TCP 프로토콜을 기반으로하며 NIO 기능을 제공하며 효율적인 직렬화 방법을 제공하며 서비스를 등록하고 발견 할 수 있습니다.
위의 기술 요구 사항에 따라 다음 기술 선택을 사용할 수 있습니다.
관련 Maven 의존성은 마지막 부록을 참조하십시오.
1 단계 : 서비스 인터페이스를 작성하십시오
공개 인터페이스 helloService {문자열 hello (문자열 이름);}이 인터페이스를 사용하기 위해 독립형 클라이언트 JAR 패키지에 배치하십시오.
2 단계 : 서비스 인터페이스의 구현 클래스 작성
@rpcservice (helloService.class) // 원격 인터페이스를 지정하여 공개 클래스 HelloServiceImpl은 HelloService {@override public String hello (String Name) {return "Hello!" + 이름; }}RPCService 주석을 사용하여 서비스 인터페이스의 구현 클래스를 정의하십시오. 구현 클래스가 여러 인터페이스를 구현할 수 있으므로 구현 클래스의 원격 인터페이스를 지정해야하므로 원격 인터페이스 인 프레임 워크를 알려야합니다.
rpcservice 코드는 다음과 같습니다.
@TARGET ({ElementType.type})@resentention(retentionpolicy.runtime)@component // spring public @interface rpcservice {class <?> value ();}에 의해 스캔 될 수 있음을 나타냅니다.이 주석에는 Spring의 구성 요소 주석의 특성이 있으며 스프링으로 스캔 할 수 있습니다.
이 구현 클래스는 서버 JAR 패키지에 배치되며 서비스 시작을위한 일부 서버 구성 파일 및 부트 스트랩 프로그램도 제공합니다.
3 단계 : 서버 구성
서버 스프링 구성 파일은 Spring.xml이라는 이름이며 내용은 다음과 같습니다.
<beans ...> <context : component-scan base-package = "com.xxx.rpc.sample.server"/> <context : property-placeholder location = "classpath : config.properties"/> <!-서비스 등록 구성 요소-> <bean id = "Servicereg "> regitorator-arg 이름을 구성합니다. value = "$ {registry.address}"/> </bean> <!-RPC 서버 구성-> <bean id = "rpcserver"> <constructor-arg name = "serverAddress"value = "$ {server.address}"/> <constructor-arg name = "serviceRegistry" "serviceRegistry"/> </bean "특정 구성 매개 변수는 config.properties 파일에 있으며 내용은 다음과 같습니다.
# Zookeeper Server Registry.address = 127.0.0.1 : 2181# rpc server server.address = 127.0.0.1 : 8000
위의 구성은 로컬 Zookeeper 서버가 연결되어 있고 RPC 서비스가 Port 8000에서 릴리스되었음을 나타냅니다.
4 단계 : 서버를 시작하고 서비스를 게시하십시오
스프링 구성 파일을로드하려면 서비스를 게시하려면 부트 로더를 작성하십시오.
public class rpcbootstrap {public static void main (String [] args) {new ClassPathXmlApplicationContext ( "spring.xml"); }}서버를 시작하려면 rpcbootstrap 클래스의 기본 메소드를 실행하지만 아직 구현되지 않은 두 가지 중요한 구성 요소, 즉 ServiceRegistry 및 RPCServer가 있습니다. 특정 구현 세부 정보는 다음과 같습니다.
5 단계 : 서비스 등록을 구현합니다
Zookeeper 클라이언트를 사용하여 서비스 등록 기능을 쉽게 구현할 수 있습니다. 서비스 코드는 다음과 같습니다.
공개 클래스 서비스 척도 {개인 정적 최종 로거 로거 = loggerfactory.getLogger (ServiceRegistry.class); Private CountdownLatch Latch = New CountdownLatch (1); 개인 문자열 레지스트리어 드레스; 공무원 (String RegistryAddress) {this.registryAddress = registryAddress; } public void Register (문자열 데이터) {if (data! = null) {Zookeeper ZK = ConnectServer (); if (zk! = null) {CreateNode (zk, data); }}} private Zookeeper ConnectServer () {Zookeeper zk = null; try {zk = new Zookeeper (registryAddress, constant.zk_session_timeout, new watcher () {@override public void process (watchedevent event) {if (event.getState () == event.keeperstate.syncConnected) {latch.countdown ();}}}); latch.await (); } catch (ioexception | InterruptedException e) {logger.error ( "", e); } return zk; } private void createNode (Zookeeper ZK, String Data) {try {byte [] bytes = data.getBytes (); 문자열 path = zk.create (constant.zk_data_path, 바이트, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential); logger.debug ( "Zookeeper Node 만들기 ({} => {})", Path, Data); } catch (recoperexception | InterruptedException e) {logger.error ( "", e); }}}그중 모든 상수는 상수를 통해 구성됩니다.
공개 인터페이스 상수 {int zk_session_timeout = 5000; 문자열 zk_registry_path = "/registry"; 문자열 zk_data_path = zk_registry_path + "/data";}참고 : 먼저, Zookeeper Client Line을 사용하여 영구 노드를 작성/레지스트리하여 모든 임시 서비스 노드를 저장해야합니다.
6 단계 : RPC 서버 구현
Netty를 사용하면 NIO를 지원하는 RPC 서버를 구현할 수 있습니다. 서비스 주소를 등록하려면 ServiceRegistry를 사용해야합니다. rpcserver 코드는 다음과 같습니다.
공개 클래스 rpcserver는 ApplicationContextAware, 초기화 {private static final logger = loggerfactory.getLogger (rpcserver.class)를 구현합니다. 개인 문자열 ServerAddress; 개인 서비스 요법 서비스; 개인지도 <문자열, 객체> 핸들러 맵 = New Hashmap <> (); // 인터페이스 이름과 서비스 객체 사이의 매핑 관계를 저장 공개 rpcserver (String ServerAddress) {this.serveraddress = serverAddress; } public rpcserver (String ServerAddress, ServiceRegistry ServiceRegistry) {this.serveraddress = ServerAddress; this.serviceregistry = ServiceRegistry; } @override public void setApplicationContext (ApplicationContext CTX)는 beansexception {map <string, object> serviceBeanMap = ctx.getBeanSwitHannotation (rpcservice.class); // rpcservice 주석이있는 모든 스프링을 가져옵니다. if (maputils.isnotempty (serviceBeanMap)) {for (Object ServiceBean : serviceBeanMap.Values ()) {String InterfaceName = serviceBean.getClass (). getAnnotation (rpcservice.class) .value (). Handlermap.put (interfaceame, servicebean); }}} @override public void approperTiesset () throws exception {eventLoopgroup bossgroup = new nioeventLoopgroup (); EventLoopGroup WorkerGroup = NEW NIOEVENTLOOPGROUP (); {serverbootstrap bootstrap = new ServerBootstrap (); bootstrap.group (bossgroup, workergroup) .Channel (nioserversocketchanchann.class) .ChildHandler (new channelInitializer <socketchannel> () {@override public void initchannel (Socketchannel Channel)은 {channel.pipeline () .adlast (new rpcdecoder (rpcrequest.code). AddLast (새로운 rpcencoder (rpcresponse.class)) // rpc 응답을 인코딩합니다 (응답을 반환). 문자열 [] array = serverAddress.split ( ":"); 문자열 호스트 = 배열 [0]; int port = integer.parseint (배열 [1]); channefluture future = bootstrap.bind (호스트, 포트) .sync (); logger.debug ( "서버는 포트에서 시작되었습니다 {}", 포트); if (serviceRegistry! = null) {serviceRegistry.register (serverAddress); // 등록 서비스 주소} future.channel (). CloseFuture (). sync (); } 마침내 {workerGroup.shutdownGracely (); bossgroup.shutdowngracely (); }}}위의 코드에는 rpcrequest와 rpcresponse와 같은 두 가지 중요한 pojos가 설명되어야합니다.
rpcrequest를 사용하여 RPC 요청을 캡슐화하면 코드는 다음과 같습니다.
공개 클래스 rpcrequest {private String requestId; 개인 문자열 클래스 이름; 개인 문자열 메소드 이름; 개인 클래스 <?> [] ParameterTypes; 개인 객체 [] 매개 변수; // getter/setter ...}rpcresponse를 사용하여 RPC 응답을 캡슐화하면 코드는 다음과 같습니다.
공개 클래스 rpcresponse {private String requestId; 비공개 던지기 오류; 개인 대상 결과; // getter/setter ...}rpcdecoder를 사용하여 RPC 디코딩을 제공하고 Netty의 ByTeTomessagedEcoder Abstract Decode 메소드를 확장하십시오. 코드는 다음과 같습니다.
공개 클래스 rpcdecoder는 bytetomessagedecoder {private class <?> genericclass를 확장합니다. public rpcdecoder (class <?> genericclass) {this.genericclass = genericclass; } @override public void decode (channelHandlerContext ctx, bytebuf in, list <bood> out)는 예외를 {if (in.readableBytes () <4) {return; } in.markReaderIndex (); int datalength = in.readint (); if (datalength <0) {ctx.close (); } if (in.readableBytes () <datalength) {in.resetReaderIndex (); 반품; } byte [] data = new Byte [Datalength]; in.readBytes (데이터); Object obj = serializationutil.deserialize (데이터, genericclass); out.add (obj); }}rpcencoder를 사용하여 RPC 인코딩을 제공하고 Netty의 MessagetoByTeencoder Abstract Class Encode 메소드를 확장하십시오. 코드는 다음과 같습니다.
공개 클래스 rpcencoder는 messagetobyteencoder {private class <?> genericclass를 확장합니다. public rpcencoder (class <?> genericclass) {this.genericclass = genericclass; } @override public void encode (Chann out.writeint (data.length); out.writeBytes (데이터); }}}직렬화 도구 클래스를 작성하고 Protostuff를 사용하여 직렬화를 구현하십시오.
public class serializationutil {private static map <class <?>, schema <?>> cachedschema = new concurrenthashmap <> (); 개인 정적 objenesis objenesis = 새로운 objenesisstd (true); private serializationutil () {} @suppresswarnings ( "선택 취소") private static <t> schema <t> getSchema (class <t> cls) {schema <t> schema = (schema <t>) cachedschema.get (cls); if (schema == null) {schema = runtimeschema.createfrom (cls); if (schema! = null) {cachedschema.put (cls, schema); }} 반환 스키마; } @SuppressWarnings ( "확인되지 않은") public static <t> byte [] serialize (t obj) {class <t> cls = (class <t>) obj.getClass (); LinkedBuffer Buffer = LinkedBuffer.Allocate (LinkedBuffer.Default_Buffer_Size); {schema <t> schema = getschema (cls); protostuffioutil.tobytearray (obj, schema, buffer)를 반환합니다. } catch (예외 e) {throw new neveralstateException (e.getMessage (), e); } 마침내 {buffer.clear (); }} public static <t> t deserialize (byte [] data, class <t> cls) {try {t message = (t) objenesis.newinstance (cls); 스키마 <t> schema = getSchema (cls); protostuffioutil.mergefrom (데이터, 메시지, 스키마); 반환 메시지; } catch (예외 e) {throw new neveralstateException (e.getMessage (), e); }}}위의 내용은 objenesis를 사용하여 객체를 인스턴스화하는데, 이는 Java 반사보다 강력합니다.
참고 : 다른 직렬화 프레임 워크를 교체 해야하는 경우 SerializationUtil을 수정하십시오. 물론,이를 구현하는 더 좋은 방법은 사용할 직렬화 방법을 결정하는 구성 항목을 제공하는 것입니다.
RPCHANDLER에서 RPC 요청을 처리하려면 Netty의 SimpleChannelInboundHandler Abstract 클래스를 확장하면 코드가 다음과 같습니다.
공개 클래스 rpchandler 확장 simplechannelInboundHandler <RPCrequest> {private static final logger = loggerfactory.getLogger (rpchandler.class); 비공개 최종지도 <문자열, 객체> 핸들러 맵; public rpchandler (map <string, object> handlermap) {this.handlermap = handlermap; } @override public void ChannelRead0 (최종 Chann response.setRequestId (request.getRequestId ()); 시도 {object result = handle (요청); Response.SetResult (결과); } catch (Throwable t) {response.seterror (t); } ctx.writeAndflush (응답) .addlistener (channelfuturelistener.close); } 개인 객체 핸들 (rpcrequest request) 던지기 가능 {String className = request.getClassName (); Object ServiceBean = handlermap.get (className); class <?> serviceclass = servicebean.getClass (); String MethodName = request.getMethodName (); class <?> [] parametertypes = request.getParameterTypes (); 개체 [] 매개 변수 = request.getParameters (); /*메소드 메소드 = serviceClass.getMethod (MethodName, ParameterTypes); method.setAccessible (true); return method.invoke (ServiceBean, 매개 변수);*/ FastClass ServiceFastClass = FastClass.create (ServiceClass); FastMethod ServiceFastMethod = ServiceFastClass.getMethod (MethodName, ParameterTypes); Return ServiceFastMethod.invoke (ServiceBean, 매개 변수); } @override public void ExceptionCaught (Chann ctx.close (); }}Java Reflection을 사용하여 발생하는 성능 문제를 피하기 위해 Fastclass 및 FastMethod와 같은 CGLIB에서 제공하는 반사 API를 사용할 수 있습니다.
7 단계 : 클라이언트 구성
또한 Spring 구성 파일을 사용하여 RPC 클라이언트를 구성하십시오. Spring.xml 코드는 다음과 같습니다.
<beans ...> <context : property-placeholder location = "classpath : config.properties"/> <!-서비스 검색 구성 요소 구성-> <bean id = "serviceDiscovery"> <constructor-arg name = "retuistryAddress"value = "$ {registry.address}"/> <!-configure rpc proxy-> bean id = "rpcproxy"> <생성자-arg name = "servicedscovery"ref = "serviceedscovery"/> </bean> </beans>config.properties 특정 구성을 제공합니다.
# Zookeeper Server Registry.address = 127.0.0.1 : 2181
8 단계 : 서비스 검색 구현
또한 Zookeeper를 사용하여 서비스 검색 기능을 구현하고 다음 코드를 참조하십시오.
공개 클래스 서비스 {private static final logger logger = loggerfactory.getLogger (serviceScovery.class); Private CountdownLatch Latch = New CountdownLatch (1); 개인 휘발성 목록 <String> Datalist = New ArrayList <> (); 개인 문자열 레지스트리어 드레스; 공무원 (String registryAddress) {this.registryAddress = registryAddress; Zookeeper ZK = ConnectServer (); if (zk! = null) {watchnode (zk); }} public String discover () {문자열 data = null; int size = datalist.size (); if (size> 0) {if (size == 1) {data = datalist.get (0); logger.debug ( "데이터 만 사용 : {}", data); } else {data = datalist.get (ThreadLocalRandom.current (). nextInt (size)); logger.debug ( "랜덤 데이터 사용 : {}", data); }} 반환 데이터; } private Zookeeper ConnectServer () {kookeeper zk = null; {zk = new Zookeeper (registryAddress, constant.zk_session_timeout, new watcher () {@override public void process (watchedevent event) {if (event.getState () == event.keeperstate.syncConnected) {latch.countdown ();}}); latch.await (); } catch (ioexception | InterruptedException e) {logger.error ( "", e); } return zk; } private void watchNode (Final Zookeeper ZK) {try {list <string> nodelist = zk.getchildren (constant.zk_registry_path, new Watcher () {@override public void process (watchedevent event) {if (event.gettype () == event.eventType.nodeChildrenchanged) {node (zk)}; }); List <string> datalist = new ArrayList <> (); for (문자열 노드 : nodelist) {byte [] bytes = zk.getData (constant.zk_registry_path + "/" + node, false, null); datalist.add (새 문자열 (바이트)); } logger.debug ( "노드 데이터 : {}", Datalist); this.datalist = Datalist; } catch (recoperexception | InterruptedException e) {logger.error ( "", e); }}}9 단계 : RPC 에이전트 구현
여기서 우리는 Java가 제공하는 동적 프록시 기술을 사용하여 RPC 프록시를 구현합니다 (물론 CGLIB를 사용하여 구현할 수도 있음). 특정 코드는 다음과 같습니다.
공개 클래스 rpcproxy {private String serverAddress; 개인 서비스 서비스 서비스; public rpcproxy (String ServerAddress) {this.serveraddress = serverAddress; } public rpcproxy (ServicedScovery ServicedScovery) {this.servicedscovery = ServiceScovery; } @suppresswarnings ( "선택 취소") public <t> t public <t> t create (class <?> interfaceclass) {return (t) proxy.newproxyinstance (interfaceclass.getclassloader (), 새로운 클래스 <?> args) {rpcrequest request (). request.getsparametertypes () integer.parseint (array [1]); rpcclient (호스트, 포트); } else {return response.getResult (); }}}); }}RPCClient 클래스를 사용하여 RPC 클라이언트를 구현하려면 Netty가 제공하는 SimpleChannelInboundHandler Abstract 클래스 만 확장하면 다음과 같습니다.
공개 클래스 rpcclient 확장 simplechannelInboundHandler <rpcresponse> {private static final logger = loggerfactory.getLogger (rpcclient.class); 개인 문자열 호스트; 개인 int 포트; 개인 RPCRPRONSER 응답; 개인 최종 객체 obj = new Object (); public rpcclient (문자열 호스트, int port) {this.host = host; this.port = 포트; } @override public void ChannelRead0 (Chann 동기화 (obj) {obj.notifyall (); // 응답을 받고, 스레드 깨우기}} @override public void ExceptionCaught (chant ctx.close (); } public rpcresponse send (rpcrequest request) 예외 {eventLoopgroup Group = new nioeventLoopGroup (); {bootstrap bootstrap = new bootstrap (); bootstrap.group (group) .Channel (niosocketchannel.class) .Handler (new channelInitializer <socketchannel> () {@override public void initchannel (socketchannel 채널)은 예외 {channel.pipeline () .addlast (새로운 rpcencoder (rpcrequest.class)) // orpc 요청 (요청). rpcdecoder (rpcresponse.class)) // rpc 응답을 해독합니다 (응답 처리) .addlast (rpcclient.this); channefluture future = bootstrap.connect (호스트, 포트) .sync (); Future.Channel (). WriteAndFlush (request) .sync (); 동기화 (obj) {obj.wait (); // 응답이 수신되지 않았으며, 스레드가 대기를 유발합니다} if (response! = null) {future.channel (). CloseFuture (). sync (); } 반환 응답; } 마침내 {group.shutdownGracely (); }}}10 단계 : RPC 요청을 보냅니다
Junit을 사용하여 다음 코드와 함께 Spring과 함께 단위 테스트를 작성하십시오.
@RunWith (SpringJunit4classRunner.class) @ContextConfiguration (locations = "classPath : Spring.xml") 공개 클래스 HelloServicetest {@autowired private rpcproxy rpcproxy; @test public void hellotest () {helloservice helloService = rpcproxy.create (helloService.class); 문자열 결과 = HelloService.Hello ( "World"); assert.assertequals ( "Hello! World", 결과); }}위의 단위 테스트를 실행하고 예상치 못한 일이 발생하지 않으면 녹색 막대가 나타납니다.
요약
이 기사는 Spring + Netty + Protostuff + Zookeeper를 통해 경량 RPC 프레임 워크를 구현합니다. Spring을 사용하여 종속성 주입 및 매개 변수 구성을 제공하고 Netty를 사용하여 NIO 데이터 전송을 구현하고 Protostuff를 사용하여 객체 직렬화를 구현하며 Zookeeper를 사용하여 서비스 등록 및 검색을 구현합니다. 이 프레임 워크를 사용하면 분산 환경의 모든 노드에 서비스를 배포 할 수 있습니다. 클라이언트는 원격 인터페이스를 통해 서버의 특정 구현을 호출하여 서버 및 클라이언트의 개발을 완전히 분리하여 대규모 분산 응용 프로그램의 구현에 대한 기본 지원을 제공합니다.
부록 : Maven 의존성
<!-junit-> <pectinement> <groupId> junit </groupId> <artifactid> junit </artifactId> <버전> 4.11 </version> <cope> test </scope> </fectionency> <! <slf4j-> <버전> 1.7.7 </version> </dependency> <!-스프링-> <pectionency> <groupid> org.springframework </groupid> <artifactid> spring-context </artifactid> <bersion> 3.2.12.Release </version> </dependency> <groupid> org.springframework </groupid> <버전> 3.2.12. Release </version> <cope> test </scope> </fectionency> <!-netty-> <pectionement> <groupId> io.netty </groupId> <ArtifactID> netty-all </artifactid> <버전> 4.0.24.final </version> <!-protostuff-> 의존성> <groupid> com.dyuproject.protostuff </groupid> <artifactid> protostuff-core </artifactid> <bersion> 1.0.8 </version> </depectency> <!-Zookeeper-> <pecientency> <groupid> org.apache.zookeeper <artifactid> Zookeper </artifactid> <버전> 3.4.6 </version> </dependency> <!-Apache Commons Collection-> <pectionency> <groupId> org.apache.commons </groupId> Commons-Collections4 </artifactId> <버전> 4.0 </version> </version> <!-dependency-> groupid> org.objenesis> org.objenesis. <artifactid> objenesis </artifactid> <bersion> 2.1 </version> </fectionency> <!-cglib-> <groupid> cglib </groupid> <artifactid> cglib </artifactid> <버전> 3.1 </version> </dependency>