RPC、またはリモートプロシージャコールは、単純に言うだけです。リモートコンピューターでサービスを呼び出すことは、ローカルサービスを呼び出すようなものです。
RPCは、HTTPまたはTCPプロトコルに基づくことができます。 Webサービスは、HTTPプロトコルに基づくRPCです。クロスプラットフォームのパフォーマンスは優れていますが、そのパフォーマンスはTCPプロトコルに基づいてRPCほど良くありません。 2つの側面がRPCのパフォーマンスに直接影響します。1つは伝送方法、もう1つはシリアル化です。
私たち全員が知っているように、TCPは輸送層プロトコル、HTTPはアプリケーションレイヤープロトコルであり、輸送層はアプリケーション層の下にあります。データ送信に関しては、下層がより速くなります。したがって、一般に、TCPはHTTPよりも高速でなければなりません。シリアル化に関しては、Javaはデフォルトのシリアル化方法を提供しますが、高い並行性の場合、この方法はいくつかのパフォーマンスボトルネックをもたらすため、Protobuf、Kryo、Kryo、Hessian、Jacksonなどの一連の優れたシリアル化フレームワークが登場しました。
高い並行性をサポートするために、従来のブロッキングIOは明らかに適していないため、非同期IO、つまりNIOが必要です。 JavaはNIOソリューションを提供し、Java 7はNIO.2サポートも優れています。 JavaでNIOを実装することは遠いことではありませんが、NIOの技術的な詳細に精通する必要があります。
分散環境でさまざまなノードでサービスを展開する必要があります。サービス登録を通じて、クライアントは現在利用可能なサービスを自動的に発見して、これらのサービスに電話することができます。これには、分散環境ですべてのサービスアドレス(ホスト名とポート番号を含む)を登録するためのサービスレジストリコンポーネントが必要です。
アプリケーション、サービス、およびサービスレジストリの関係を以下の図に示します。
各サーバーで複数のサービスを公開できます。これらのサービスはホストとポートを共有しています。分散環境では、サーバーが提供され、共同でサービスを提供します。さらに、サービスレジストリの単一の障害点を防ぐには、クラスター環境に組み込む必要があります。
この記事では、軽量分散RPCフレームワークを開発する特定のプロセスを明らかにします。このフレームワークは、TCPプロトコルに基づいており、NIO機能を提供し、効率的なシリアル化方法を提供し、サービスを登録および発見する機能も備えています。
上記の技術的要件によれば、次のテクノロジー選択を使用できます。
関連するMaven依存関係については、最後の付録を参照してください。
ステップ1:サービスインターフェイスを作成します
パブリックインターフェイスhelloservice {string hello(string name);}このインターフェイスを使用するスタンドアロンクライアントJARパッケージに配置します。
ステップ2:サービスインターフェイスの実装クラスを書き込む
@rpcservice(helloservice.class)//リモートインターフェイスの指定パブリッククラスHelloserviceImplを実装しています{@Override public String hello(string name){return "hello!" +名前; }}RPCServiceアノテーションを使用して、サービスインターフェイスの実装クラスを定義します。実装クラスは複数のインターフェイスを実装できるため、実装クラスにリモートインターフェイスを指定する必要があるため、リモートインターフェイスであるフレームワークを通知する必要があります。
RPCServiceコードは次のとおりです。
@target({elementType.type})@retention(retentionpolicy.runtime)@component // spring public@interface rpcservice {class <?> value();}によってスキャンできることを示します。この注釈には、Springのコンポーネント注釈の特性があり、春までにスキャンできます。
この実装クラスは、サーバーJARパッケージに配置されており、サービスを開始するためのサーバー構成ファイルとブートストラッププログラムも提供します。
ステップ3:サーバーを構成します
サーバーSpring構成ファイルはSpring.xmlという名前で、コンテンツは次のとおりです。
<bean ...> <context:component-scanベースパッケージ= "com.xxx.rpc.sample.server"/> <context:classpath:config.properties "/> <! - サービス登録コンポーネントの構成 - > <bean id =" serviceregistry "> <construction-arg ="> ">"> </bean> <! - rpc serverを構成 - > <bean id = "rpcserver"> <constructor-arg name = "serverAddress" value = "$ {server.address}"/> <constructor-arg name = "特定の構成パラメーターはconfig.propertiesファイルにあり、コンテンツは次のとおりです。
#Zookeeper Server Registry.Address = 127.0.0.1:2181#RPC Server Server.Address = 127.0.0.1:8000
上記の構成は、ローカルZookeeperサーバーが接続され、RPCサービスがポート8000でリリースされていることを示しています。
ステップ4:サーバーを起動し、サービスを公開します
Spring構成ファイルをロードしてサービスを公開するには、ブートローダーを作成するだけです。
public class rpcbootstrap {public static void main(string [] args){new ClassPathxMLApplicationContext( "Spring.xml"); }}RPCBootStrapクラスの主な方法を実行してサーバーを起動しますが、まだ実装されていない2つの重要なコンポーネント、つまりServiceregistryとRPCServerがあります。特定の実装の詳細を以下に示します。
ステップ5:サービス登録を実装します
サービス登録機能は、Zookeeperクライアントを使用して簡単に実装できます。 Serviceregistryコードは次のとおりです。
public class Serviceregistry {private static final logger logger = loggerfactory.getLogger(serviceregistry.class); Private CountDownLatch = new CountDownLatch(1);プライベートストリングレジストリドレス; public Serviceregistry(String RegistryAddress){this.registryAddress = registryAddress; } public void Register(String Data){if(data!= null){zookeeper zk = connectServer(); if(zk!= null){createNode(zk、data); }}} private zookeeper connectserver(){zookeeper zk = null; try {zk = new zookeeper(registryaddress、constant.zk_session_timeout、new watcher(){@Override public void Process(watchedEvent event){if(event.getState()== event.keeperstate.syncconected){latch.countdown();}}}); latch.await(); } catch(ioException | arturnedexception e){logger.error( ""、e); } zkを返します。 } private void createNode(zookeeper zk、string data){try {byte [] bytes = data.getBytes(); string path = zk.create(constant.zk_data_path、bytes、zoodefs.ids.open_acl_unsafe、createmode.ephemeral_sevesentient); logger.debug( "zookeeper node({} => {})"、path、data)を作成します。 } catch(kemperexception | arturtedexception e){logger.error( ""、e); }}}その中で、すべての定数は定数で構成されています。
パブリックインターフェイス定数{int zk_session_timeout = 5000;文字列zk_registry_path = "/registry";文字列zk_data_path = zk_registry_path + "/data";}注:まず、Zookeeperクライアントコマンドラインを使用して、すべての一時的なノードを保存するために、永続的なノードを作成/レジストリを作成する必要があります。
ステップ6:RPCサーバーを実装します
Nettyを使用すると、NIOをサポートするRPCサーバーを実装できます。サービスアドレスを登録するためにServiceregistryを使用する必要があります。 RPCServerコードは次のとおりです。
パブリッククラスrpcserverは、applicationcontextawareを実装しています。初期化bean {private static final logger logger = loggerfactory.getLogger(rpcserver.class); Private String ServerAddress;プライベートサービケレジストリーサービス。プライベートマップ<文字列、オブジェクト> handlermap = new Hashmap <>(); //インターフェイス名とサービスオブジェクトの間のマッピング関係を保存しますpublic rpcserver(string serverAddress){this.serveraddress = serverAddress; } public rpcserver(String ServerAddress、Serviceregistry ServiceRegistry){this.ServerAddress = serverAddress; this.serviceregistry = serviceregistry; } @Override public void setApplicationContext(applicationContext ctx)throws beansexception {map <string、object> servicebeanmap = ctx.getbeanswithannotation(rpcservice.class); // rpcservice annotations beanですべてのスプリングを取得するif(maputils.isnotempty(servicebeanmap)){for(object servicebean:servicebeanmap.values()){string interfaceName = servicebean.getClass()。getAnnotation(rpcservice.class)。 handlermap.put(interfaceName、servicebean); }}} @Override public void avtherpropertiesset()throws exception {eventloopgroup bossgroup = new nioeventloopgroup(); eventloopgroup workergroup = new nioeventloopgroup(); try {serverbootstrap bootstrap = new ServerBootStrap(); bootstrap.group(bossgroup、workergroup).channel(nioserversocketchannel.class).childhandler(new ChannelInitializer <Socketchannel>(){@override public void initchannel(socketchannelチャンネル)スロー例外{channel.pipeline().addlast(new rpcdecoder(rpcdecoder() RPCリクエスト(リクエストを処理するため).addlast(new rpcencoder(rpcresponse.class))// RPC応答をエンコードします(応答を返す)。 .childoption(channeloption.so_keepalive、true); string [] array = serverAddress.split( ":"); string host = array [0]; int port = integer.parseint(array [1]); Channelfuture Future = bootstrap.bind(host、port).sync(); logger.debug( "サーバーがport {}"、portで開始された); if(serviceregistry!= null){serviceregistry.register(serverAddress); //登録サービスアドレス} future.channel()。closefuture()。sync(); }最後に{workergroup.shutdowngracefuly(); BOSSGROUP.SHUTDOWNGRACELY(); }}}上記のコードには、記述する必要がある2つの重要なpoJO、つまりRPCRequestとrpcreSponseがあります。
RPCRequestを使用してRPCリクエストをカプセル化します。コードは次のとおりです。
public class rpcrequest {private string requestId;プライベート文字列className; private string methodname;プライベートクラス<?> [] parametertypes;プライベートオブジェクト[]パラメーター。 // getter/setter ...}RPCRESPONSEを使用してRPC応答をカプセル化します。コードは次のとおりです。
public class rpcreSponse {private string requestid;プライベートスロー可能なエラー;プライベートオブジェクトの結果; // getter/setter ...}RPCDeCoderを使用してRPCデコードを提供し、NettyのByteTomessageCoder Abstractクラスデコードメソッドを拡張するだけで、コードは次のとおりです。
パブリッククラスrpcdecoderは、bytetomessagedecoder {private class <?> genericclassを拡張します。 public rpcdecoder(class <?> genericclass){this.genericclass = genericclass; } @Override public void decode(channelandlercontext ctx、bytebuf in、list <object> out)スロー{if(in.readablebytes()<4){return; } in.markreaderIndex(); int dataLength = in.readint(); if(dataLength <0){ctx.close(); } if(in.readableBytes()<datAlength){in.resetreaderIndex();戻る; } byte [] data = new byte [datalength]; in.readbytes(data);オブジェクトobj = serializationutil.deserialize(data、genericclass); out.add(obj); }}RPCENCODERを使用してRPCエンコーディングを提供し、NettyのMessageTobyteCoder Abstractクラスエンコードメソッドを拡張するだけです。コードは次のとおりです。
パブリッククラスRPCENCODER拡張MessageTobyteConder {private class <?> genericClass; public rpcencoder(class <?> genericclass){this.genericclass = genericclass; } @Override public void Encode(ChannelHandlercontext ctx、object in、bytebuf out)throws exception {if(genericclass.isinstance(in)){byte [] data = serializationutil.serialize(in); out.writeint(data.length); out.writebytes(data); }}}SerializationUtil Toolクラスを作成し、プロトスタフを使用してシリアル化を実装します。
public class serializationutil {private static map <class <?私的静的objenesis objenesis = new objenesisstd(true); private serializationutil(){} @suppresswarnings( "unchecked")private static <t> schema <t> getschema(class <t> cls){schema <t> schema =(schema <t>)cachedschema.get(cls); if(schema == null){schema = runtimeschema.createfrom(cls); if(schema!= null){cachedschema.put(cls、schema); }} return schema; } @suppresswarnings( "un -checked")public static <t> byte [] serialize(t obj){class <t> cls =(class <t>)obj.getclass(); linkedbuffer buffer = linkedbuffer.allocate(linkedbuffer.default_buffer_size); try {schema <t> schema = getSchema(cls); protostuffioutil.tobytearray(obj、schema、buffer)を返します。 } catch(Exception e){新しいIllegalStateException(e.getMessage()、e); }最後に{buffer.clear(); }} public static <t> t Deserialize(byte [] data、class <t> cls){try {t message =(t)objenesis.newinstance(cls); schema <t> schema = getschema(cls); protostuffioutil.mergefrom(データ、メッセージ、スキーマ);メッセージを返します。 } catch(Exception e){新しいIllegalStateException(e.getMessage()、e); }}}上記では、Objenesisを使用してオブジェクトをインスタンス化します。これはJava反射よりも強力です。
注:他のシリアル化フレームワークを交換する必要がある場合は、SerializationUtilを変更するだけです。もちろん、それを実装するより良い方法は、使用するシリアル化方法を決定するための構成項目を提供することです。
RPCHANDLERでRPCリクエストを処理するには、NettyのSimpleChannelinboundlerの抽象クラスを拡張するだけです。コードは次のとおりです。
パブリッククラスrpchandlerは、SimpleChannelinBoundhandler <RPCRequest> {private static final logger logger = loggerfactory.getLogger(rpchandler.class);プライベート最終マップ<文字列、オブジェクト> handlermap; public rpchandler(map <string、object> handlermap){this.handlermap = handlermap; } @Override public void Channelread0(final channelandlercontext ctx、rpcrequest request)スロー例外{rpcresponse応答= new rpcreSponse(); Response.setRequestId(request.getRequestId()); try {object result = handle(request); Response.setResult(result); } catch(throwable t){respons.setError(t); } ctx.writeandflush(response).addlistener(channelfuturelistener.close); }プライベートオブジェクトハンドル(rpcrequest request)スロースロー可能{string classname = request.getClassName(); Object ServiceBean = handLermap.get(className); class <? string methodname = request.getMethodName(); class <? object [] parameters = request.getParameters(); /*method method = serviceclass.getMethod(methodName、parametertypes); method.setAccessible(true); return method.invoke(servicebean、パラメーター); fastmethod servicefastmethod = servicefastclass.getmethod(methodname、parametertypes); Return ServiceFastMethod.Invoke(ServiceBean、パラメーター); } @Override public void exceptionCaught(ChannelHandlerContext CTX、Throwable cause){logger.Error( "Server Catch Exception"、course); ctx.close(); }}Java Reflectionを使用して引き起こされるパフォーマンスの問題を回避するために、上記で使用したFastClassやFastMethodなど、CGLIBが提供する反射APIを使用できます。
ステップ7:クライアントを構成します
また、Spring構成ファイルを使用して、RPCクライアントを構成します。 spring.xmlコードは次のとおりです。
<bean ...> <context:Property-placeholder location = "classpath:config.properties"/> <! - configure service discovery component-> <bean id = "servicediscovery"> <constructor-arg name = "registryadress" value = "$ {registry.address}"/>> </bean> < <constructor-arg name = "servicediscovery" ref = "servicediscovery"/> </bean> </beans>config.propertiesは特定の構成を提供します。
#Zookeeper Server Registry.Address = 127.0.0.1:2181
ステップ8:サービスの発見を実装します
また、Zookeeperを使用してサービス発見機能を実装します。次のコードを参照してください。
public class servicediscovery {private static final logger logger = loggerfactory.getLogger(servicediscovery.class); Private CountDownLatch = new CountDownLatch(1);プライベート揮発性リスト<String> Datalist = new ArrayList <>();プライベートストリングレジストリドレス; public ServicedIscovery(String RegistryAddress){this.registryAddress = registryAddress; Zookeeper ZK = ConnectServer(); if(zk!= null){watchnode(zk); }} public string discover(){string data = null; int size = datalist.size(); if(size> 0){if(size == 1){data = datalist.get(0); logger.debug( "データのみを使用:{}"、data); } else {data = datalist.get(threadlocalrandom.current()。nextint(size)); logger.debug( "ランダムデータの使用:{}"、data); }}データを返します。 } private zookeeper connectServer(){zookeeper zk = null; try {zk = new zookeeper(registryaddress、constant.zk_session_timeout、new watcher(){@Override public void Process(watchedEvent event){if(event.getState()== event.keeperstate.syncconeded){latch.countdown();}}); latch.await(); } catch(ioException | arturnedexception e){logger.error( ""、e); } zkを返します。 } private void watchnode(final zookeeper zk){try {list <string> nodelist = zk.getChildren(constant.zk_registry_path、new watcher(){@override public void process(watchedevent event){if(event.gettype()== event.nodechildemode( }});リスト<String> datalist = new ArrayList <>(); for(string node:nodeList){byte [] bytes = zk.getData(custrance.zk_registry_path + "/" + node、false、null); datalist.add(new String(BYTES)); } logger.debug( "node data:{}"、datalist); this.datalist = datalist; } catch(kemperexception | arturtedexception e){logger.error( ""、e); }}}ステップ9:RPCエージェントの実装
ここでは、Javaが提供する動的プロキシテクノロジーを使用してRPCプロキシを実装します(もちろん、CGLIBを使用して実装することもできます)。特定のコードは次のとおりです。
パブリッククラスrpcproxy {private string serverAddress; Private ServicedIscovery ServicedIscovery; public rpcproxy(string serverAddress){this.serveraddress = serverAddress; } public rpcproxy(servicediscovery servicediscovery){this.servicediscovery = servicediscovery; } @suppresswarnings( "Unchecked")public <t> t Create(class <? Objects Throws {rpcrequest = new rpcrequest(); request.setparameTypes()request.setiscovery!= null); integer.parseint(array [1]); } else {return Response.getResult(); }}}); }}RPCCLientクラスを使用してRPCクライアントを実装するには、Nettyが提供するSimpleChannelinboundler Abstractクラスを拡張する必要があります。コードは次のとおりです。
パブリッククラスrpcclientは、SimpleChannelinBoundHandler <rpcreSponse> {private static final logger logger = loggerFactory.getLogger(rpcclient.class);プライベートストリングホスト。プライベートインターポート;プライベートRPCRESPONSE応答。プライベート最終オブジェクトobj = new object(); public rpcclient(string host、int port){this.host = host; this.port = port; } @Override public void ChannelRead0(ChannelHandlerContext CTX、rpcreSponse応答)スロー{this.response = response;同期(obj){obj.notifyall(); //応答を受信し、スレッドを起こします}} @Override public void exceptioncaught(ChannelHandlercontext ctx、スロー可能な原因)スロー例外{ogger.error( "クライアントキャッチ例外"、原因); ctx.close(); } public rpcreSponse send(rpcrequest request)スロー例外{eventloopgroup group = new nioeventloopgroup(); try {bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(niosocketchannel.class).handler(new ChannelInitializer <Socketchannel>(){@override public void initchannel(socketchannelチャンネル)スロー{channel.pipeline().addlast(new rpceScoder(rpcretest.class) / classのリクエスト) .addlast(new rpcdecoder(rpcresponse.class))// rpc応答をデコードする(rpcclient.this)。 Channelfuture Future = bootstrap.connect(host、port).sync(); future.channel()。writeandflush(request).sync();同期(obj){obj.wait(); //応答はありませんでしたが、スレッドが待機します} if(response!= null){future.channel()。closefuture()。sync(); }返信応答; }最後に{group.shutdowngracefuly(); }}}ステップ10:RPCリクエストを送信します
Junitを使用して、次のコードを使用して、Springと組み合わせてユニットテストを作成します。
@runwith(springjunit4classrunner.class)@contextconfiguration(locations = "classpath:spring.xml")public class helloservicetest {@autowired private rpcproxy rpcproxy; @test public void hellotest(){helloservice helloservice = rpcproxy.create(helloservice.class);文字列result = helloservice.hello( "world"); assert.assertequals( "hello!world"、result); }}上記のユニットテストを実行すると、予期しないことが発生しない場合は、緑色のバーが表示されます。
要約します
この記事では、Spring + Netty + Protostuff + Zookeeperを通じて軽量RPCフレームワークを実装しています。 Springを使用して依存関係インジェクションとパラメーター構成を提供し、Nettyを使用してNIOデータ送信を実装し、プロトスタフを使用してオブジェクトシリアル化を実装し、Zookeeperを使用してサービス登録と発見を実装します。このフレームワークを使用して、分散環境の任意のノードにサービスを展開できます。クライアントは、リモートインターフェイスを介してサーバーの特定の実装を呼び出し、サーバーとクライアントの開発を完全に分離し、大規模な分散アプリケーションの実装に関する基本的なサポートを提供します。
付録:Maven依存
<! - junit-> <dependency> <groupid> junit </groupid> <artifactid> junit </artifactid> <bersion> 4.11 </version> <scope> test </scope> </dependency> <! - > <依存関係> <groupid> org.slf4j </groupid> </artifactid> slf4j-log4j12 <バージョン> 1.7.7 </version> </dependency> <! - spring-> <dependency> <groupid> org.springframework </groupid> <artifactid> spring-context </artifactid> <バージョン> 3.2.12.Release </version> </dependency> <seplency> <グループ<グループ< <artifactid> spring-test </artifactid> <version> 3.2.12.release </version> <scope> test </scope> </dependency> <! - netty-> <依存関係> groupid> io.netty </groupid> <artifactid> netty-all </artifactid> < - > <依存関係> groupId> com.dyuproject.protostuff </groupid> <artifactid> protostuff-core </artifactid> <version> 1.0.8 </version> </dependency> <! - zookeeper - > <依存関係> groupid> org.apache.zokeeper </artifactid> </groupid> </artifactid> zoekid> <バージョン> 3.4.6 </version> </Dependency> <! - Apache Commons Collections-> <dependency> groupid> org.apache.commons </groupid> <artifactid> commons-collections4 </artifactid> 4.0 </version> </dependency> < <artifactid> objenesis </artifactid> <bersion> 2.1 </version> </dependency> <! - cglib - > <依存性> <groupid> cglib </groupid> <artifactid> cglib </artifactid> <バージョン> 3.1 </version> </dependency>