RPC, ou appel à distance, est simplement de le dire simplement: appeler des services sur des ordinateurs distants, c'est comme appeler des services locaux.
RPC peut être basé sur le protocole HTTP ou TCP. Le service Web est un RPC basé sur le protocole HTTP. Il a de bonnes performances multiplateforme, mais ses performances ne sont pas aussi bonnes que RPC basées sur le protocole TCP. Deux aspects affecteront directement les performances du RPC, l'une est la méthode de transmission et l'autre est la sérialisation.
Comme nous le savons tous, TCP est le protocole de couche de transport, HTTP est le protocole de couche d'application et la couche de transport est plus sous la couche d'application. En termes de transmission de données, la couche inférieure est plus rapide. Par conséquent, en général, le TCP doit être plus rapide que HTTP. En ce qui concerne la sérialisation, Java fournit la méthode de sérialisation par défaut, mais dans le cas d'une concurrence élevée, cette méthode apportera des goulots d'étranglement de performances, de sorte qu'une série d'excellents cadres de sérialisation a émergé sur le marché, tels que: Protobuf, Kryo, Hessian, Jackson, etc. Ils peuvent remplacer la sérialisation par défaut de Java pour fournir des performances plus efficaces.
Pour soutenir une concurrence élevée, le blocage traditionnel IO ne convient évidemment pas, nous avons donc besoin d'IO asynchrone, c'est-à-dire Nio. Java fournit des solutions NIO et Java 7 fournit également un meilleur support NIO.2. La mise en œuvre de NIO avec Java n'est pas une chose lointaine, mais nous devons être familiarisés avec les détails techniques de NIO.
Nous devons déployer des services sur différents nœuds dans un environnement distribué, et grâce à l'enregistrement des services, le client peut découvrir automatiquement les services actuellement disponibles et appeler ces services. Cela nécessite un composant de registre de services pour enregistrer toutes les adresses de service (y compris: nom d'hôte et numéro de port) dans un environnement distribué.
La relation entre le registre de l'application, du service et des services est indiquée dans la figure ci-dessous:
Plusieurs services peuvent être publiés sur chaque serveur. Ces services partagent un hôte et un port. Dans un environnement distribué, le serveur sera fourni pour fournir conjointement des services. De plus, pour éviter le point de défaillance unique du registre des services, il doit être intégré dans un environnement de cluster.
Cet article révèlera le processus spécifique de développement d'un cadre RPC distribué léger. Ce cadre est basé sur le protocole TCP, fournit des fonctionnalités NIO, fournit des méthodes de sérialisation efficaces et a également la possibilité de s'inscrire et de découvrir des services.
Selon les exigences techniques ci-dessus, nous pouvons utiliser la sélection de technologie suivante:
Pour les dépendances Maven connexes, veuillez consulter le dernier annexe.
Étape 1: Écrivez une interface de service
Interface publique Helloservice {String Hello (Nom de la chaîne);}Placez cette interface dans un package JAR client autonome à utiliser.
Étape 2: Écrivez la classe d'implémentation de l'interface de service
@RpcService (helloService.class) // Spécifiez la classe publique de l'interface distante HelloserviceIMPl implémente Helloservice {@Override public String Hello (Nom de chaîne) {return "Bonjour!" + nom; }}Utilisez l'annotation RPCService pour définir la classe d'implémentation de l'interface de service. Vous devez spécifier une interface distante pour la classe d'implémentation, car la classe d'implémentation peut implémenter plusieurs interfaces, vous devez donc indiquer le cadre qui est l'interface distante.
Le code RPCService est le suivant:
@Target ({elementType.type}) @ rétention(retentionPolicy.runtime)@component // indique qu'il peut être scanné par Spring public @interface rpcService {class <?> Value ();};Cette annotation a les caractéristiques de l'annotation des composants de Spring et peut être scannée par le printemps.
Cette classe d'implémentation est placée dans le package JAR Server, qui fournit également des fichiers de configuration de serveur et des programmes bootstrap pour le démarrage du service.
Étape 3: Configurez le serveur
Le fichier de configuration de Spring est nommé Spring.xml et le contenu est le suivant:
<Beans ...> <Context: Component-Scan Base-Package = "com.xxx.rpc.sample.server" /> <context: propriété-placeholder location = "classpath: config.properties" /> <! - Configurer le service d'enregistrement de service -> <bean id = "ServiceRegistry"> <constructeur-arg name = "RegistryAddress" Value = "$ {Registry} ADME` `Name =" RegistryAddress "Value =" $ {aadddress} "/" / "RegistryAddress" </EAN> <! - Configurer le serveur RPC -> <bean id = "RPCServer"> <Constructor-Arg name = "ServerAddress" value = "$ {server.address}" /> <constructor-arg name = "ServiceRegistry" Ref = "ServiceRegistry" /> </ean> </bEANS>Les paramètres de configuration spécifiques se trouvent dans le fichier config.properties, et le contenu est le suivant:
# ZooKeeper Server Registry.Address = 127.0.0.1: 2181 # RPC Server Server.Address = 127.0.0.1: 8000
La configuration ci-dessus indique que le serveur ZooKeeper local est connecté et que le service RPC est publié sur le port 8000.
Étape 4: Démarrez le serveur et publiez le service
Pour charger des fichiers de configuration de ressort pour publier un service, écrivez simplement un chargeur de démarrage:
public class rpcbootstrap {public static void main (string [] args) {new classPathxmlApplicationContext ("spring.xml"); }}Exécutez la méthode principale de la classe RPCBootstrap pour démarrer le serveur, mais il existe deux composants importants qui n'ont pas encore été implémentés, à savoir: ServiceRegistry et RPCServer. Les détails de mise en œuvre spécifiques seront donnés ci-dessous.
Étape 5: Mettre en œuvre l'inscription des services
La fonction d'enregistrement de service peut être facilement implémentée à l'aide du client ZooKeeper. Le code ServiceRegistry est le suivant:
classe publique ServiceRegistry {private static final logger logger = loggerfactory.getLogger (ServiceRegistry.class); CountdownLatch privé Latch = nouveau compte à rebours (1); String RegistryAddress privé; public ServiceRegistry (String RegistryAddress) {this.RegistryAddress = RegistryAddress; } public void registre (String data) {if (data! = null) {zookeeper zk = connectServer (); if (zk! = null) {Creenode (zk, data); }}} private zookeeper connectServer () {zookeeper zk = null; try {zk = new ZooKeeper (RegistryAddress, constante.zk_session_timeout, new watcher () {@Override public void process (watchdevent event) {if (event.getState () == event.keeperstate.syncconnected) {latch.countDown ();}}}); latch.Await (); } catch (ioException | InterruptedException e) {Logger.Error ("", e); } return zk; } private void createNode (zookeeper zk, string data) {try {byte [] bytes = data.getBytes (); String path = zk.create (constant.zk_data_path, bytes, zoods.ids.open_acl_unsafe, createmode.ephemeral_sequential); Logger.debug ("Create ZooKeeper Node ({} => {})", path, data); } Catch (gardien de base | InterruptedException e) {Logger.Error ("", e); }}}Parmi eux, toutes les constantes sont configurées par constante:
Interface publique constante {int zk_session_timeout = 5000; String zk_registry_path = "/ registre"; String zk_data_path = zk_registry_path + "/ data";}Remarque: Tout d'abord, vous devez utiliser la ligne de commande du client Zookeeper pour créer / registre des nœuds permanents pour stocker tous les nœuds de service temporaires.
Étape 6: Implémentez le serveur RPC
L'utilisation de Netty peut implémenter un serveur RPC qui prend en charge NIO. Vous devez utiliser ServiceRegistry pour enregistrer l'adresse du service. Le code RPCServer est le suivant:
classe publique RPCServer implémente ApplicationContextAware, InitializingBean {private static final logger logger = loggerfactory.getLogger (rpcServer.class); String Private ServerAddress; ServiceRegistry privé ServiceRegistry; map privé <chaîne, objet> handLerMap = new hashmap <> (); // Stockez la relation de mappage entre le nom de l'interface et l'objet de service RPCServer (String ServerAddress) {this.serverAddress = serverAddress; } public RPCServer (String ServerAddress, ServiceRegistry ServiceRegistry) {this.serverAddress = serverAddress; this.serviceRegistry = ServiceRegistry; } @Override public void setApplicationContext (applicationContext ctx) lève BeanSexception {map <string, object> ServiceBeanMap = ctx.getBeanSwithannotation (rpcService.class); // Obtenez toutes les ressorts avec rpcService annotations bean if (maputils.isnotempty (ServiceBeAnMap)) {for (object ServiceBean: ServiceBeanMap.Values ()) {String interfacename = ServiceBean.getClass (). GetNamedation (rpcService.class) .Value (). GetName (); handlermap.put (interfacename, serviceBean); }}} @Override public void AfterProperTesTet () lève une exception {eventLoopGroup BossGroup = new NioEventLoopGroup (); EventLoopGroup WorkerGroup = new NioEventLoopGroup (); try {serverbootstrap bootstrap = new serverbootstrap (); Bootstrap.group (BossGroup, WorkerGroup) .Channel (NioserVersocketchannel.class) .ChildHandler (New Channenitializer <Socketchannel> () {@Override public void Initchannel (Socketchannel Channel) lance Exception {Channel.Pipeline () .Addlast (New RpcDecode RPC Demande (pour gérer la demande) .Addlast (nouveau RPCencoder (RPCRESSONCE.CLASS) // Encode la réponse RPC (pour retourner la réponse) .Addlast (nouveau RPChandler (HandlerMap)); String [] array = serverAddress.split (":"); String host = array [0]; int port = Integer.ParseInt (array [1]); ChannelfUture Future = bootstrap.bind (hôte, port) .Sync (); Logger.debug ("Server démarré sur le port {}", port); if (ServiceRegistry! = null) {ServiceRegistry.Register (ServerAddress); // Enregistrer l'adresse du service} futur.Channel (). CloseFuture (). Sync (); } enfin {workerGroup.shutdowngracely (); bossgroup.shutdowngracely (); }}}Dans le code ci-dessus, il y a deux Pojos importants qui doivent être décrits, à savoir RPCrequest et RPCResponse.
Utilisez RPCRequest pour encapsuler les demandes RPC, le code est le suivant:
classe publique rpCrequest {private String requestId; ClassName de chaîne privée; Méthode de chaîne privée; classe privée <?> [] ParamètreTypes; Paramètres d'objet privé []; // Getter / Setter ...}Utilisez RPCResponse pour résumer la réponse RPC, le code est le suivant:
classe publique rpcResponse {private String demandeId; Erreur jetable privée; résultat d'objet privé; // Getter / Setter ...}Utilisez RPCDECODER pour fournir un décodage RPC, il suffit d'étendre la méthode de décodage de classe abstraite ByteToMessageDeccoder, le code est le suivant:
La classe publique RPCDECODER étend ByteToMessageDecoder {Private Class <?> GenericClass; public rpcdecoder (class <?> genericclass) {this.genericclass = genericclass; } @Override public void Decode (ChannelHandlerContext ctx, bytebuf in, list <object> out) lève une exception {if (in.readableBytes () <4) {return; } in.markreaderIndex (); int datalngle = in.readInt (); if (datalngle <0) {ctx.close (); } if (in.readableBytes () <datalngle) {in.resetRenerIndex (); retour; } octet [] data = nouveau octet [datalngle]; in.readBytes (données); Objet obj = serializationUtil.deserialize (data, genericclass); out.add (obj); }}Utilisez RPCencoder pour fournir un codage RPC, étendez simplement MessagetoByTeencoder Résumé de la méthode de la classe de classe, le code est le suivant:
La classe publique RPCencoder étend MessageToByteencoder {Private class <?> GenericClass; public rpCencoder (class <?> genericclass) {this.genericclass = genericclass; } @Override public void Encode (ChannelHandlerContext ctx, objet in, bytebuf out) lève exception {if (genericclass.isinstance (in)) {byte [] data = serializationUtil.serialize (in); out.writeInt (data.length); out.writeBytes (données); }}}Écrivez une classe d'outils SerializationUtil et utilisez Protostuff pour implémenter la sérialisation:
classe publique SerializationUtil {Map statique privé <classe <?>, schéma <? >> cachedSchema = new concurrenthashmap <> (); Objenesis Objenesis Objenesis = Nouvelle Objenesisstd (Vrai); private SerializationUtil () {} @SuppressWarnings ("Unchecked") Schema statique <t> privé <T> GetSchema (classe <T> CLS) {Schema <T> Schema = (Schema <T>) CacheSchema.get (CLS); if (schema == null) {schema = runtimeschema.CreateFrom (CLS); if (schéma! = null) {cachedSchema.put (CLS, schéma); }} Retour schéma; } @SuppressWarnings ("Unchecked") public static <T> byte [] serialize (t obj) {class <t> cls = (class <t>) obj.getClass (); LinkedBuffer Buffer = LinkedBuffer.AllOcy (LinkedBuffer.Default_Buffer_Size); essayez {schéma <t> schema = getSchema (CLS); return protostuffioutil.tobytearray (obj, schéma, tampon); } catch (exception e) {lancer un nouveau illégalStateException (e.getMessage (), e); } enfin {buffer.clear (); }} public static <T> t Desérialize (byte [] data, class <t> CLS) {try {t message = (t) objenesis.newinstance (CLS); Schema <T> Schema = GetSchema (CLS); Protostuffioutil.mergeFrom (données, message, schéma); retour du message; } catch (exception e) {lancer un nouveau illégalStateException (e.getMessage (), e); }}}Ce qui précède utilise l'objénèse pour instancier des objets, ce qui est plus puissant que la réflexion Java.
Remarque: Si vous devez remplacer d'autres cadres de sérialisation, modifiez simplement le SerializationUtil. Bien sûr, une meilleure façon de l'implémenter est de fournir des éléments de configuration pour décider quelle méthode de sérialisation utiliser.
Pour gérer les demandes de RPC dans RPChandler, il vous suffit d'étendre la classe abstraite SimpleChanNeLinboundHandler de Netty, le code est le suivant:
La classe publique RPChandler étend SimpleChanNeLinboundHandler <RPCRequest> {private static final logger Logger = LoggerFactory.getLogger (rpchandler.class); Carte finale privée <String, objet> handlermap; public rpchandler (map <string, objet> handlermap) {this.handlermap = handlermap; } @Override public void ChannelRead0 (Final ChannelHandlerContext CTX, RPCRequest Request) lève une exception {RPCResponse Response = new rpcResponse (); réponse.setRequeStid (request.getRequeStid ()); essayez {objet résultat = manche (request); réponse.setResult (résultat); } catch (Throwable T) {Response.SetError (t); } ctx.writeAndflush (réponse) .AddListener (ChannelfUtutureListener.close); } La poignée d'objet privé (RPCRequest Request) lève le throwsable {String className = request.getClassName (); Object ServiceBean = handLerMap.get (className); Classe <?> ServiceClass = ServiceBean.getClass (); String MethodName = request.getMethodName (); Class <?> [] ParameterTypes = request.getParameterTypes (); Objet [] Paramètres = request.getParameters (); / * Méthode méthode = ServiceClass.getMethod (méthodyName, ParameterTypes); Method.SetAccessible (true); return Method.invoke (ServiceBean, Paramètres); * / FastClass ServiceFastClass = FastClass.Create (ServiceClass); FastMethod ServiceFastMethod = ServiceFastClass.getMethod (MethodName, ParameterTypes); return ServiceFastMethod.invoke (ServiceBean, Paramètres); } @Override public void exceptionCaught (canalshandlerContext ctx, cause throwable) {logger.error ("serveur de catch exception", cause); ctx.close (); }}Afin d'éviter les problèmes de performances causés par l'utilisation de la réflexion Java, nous pouvons utiliser l'API de réflexion fournie par CGLIB, comme FastClass et FastMethod utilisés ci-dessus.
Étape 7: Configurez le client
Utilisez également des fichiers de configuration Spring pour configurer le client RPC. Le code Spring.xml est le suivant:
<Beans ...> <Context: propriété-placeholder location = "classPath: config.properties" /> <! - Configurer le composant de découverte de service -> <bean id = "Servicediscovery"> <Constructor-arg name = "RegistryAddress" value = "$ {registry.address}" /> </ean> <! <Constructor-arg name = "Servicediscovery" ref = "Servicediscovery" /> </EAN> </EANS>config.properties fournit une configuration spécifique:
# ZooKeeper Server Registry.Address = 127.0.0.1: 2181
Étape 8: Mettre en œuvre la découverte des services
Utilisez également Zookeeper pour implémenter la fonction de découverte de services, voir le code suivant:
classe publique Servicediscovery {private static final logger logger = loggerfactory.getLogger (sertvicediscovery.class); CountdownLatch privé Latch = nouveau compte à rebours (1); Liste volatile privée <string> datalist = new ArrayList <> (); String RegistryAddress privé; public Servicediscovery (String RegistryAddress) {this.registryAddress = RegistryAddress; ZOOKEEPER ZK = ConnectServer (); if (zk! = null) {watchNode (zk); }} public String Discover () {String data = null; int size = datalist.size (); if (size> 0) {if (size == 1) {data = datalist.get (0); Logger.debug ("Utilisation uniquement de données: {}", données); } else {data = datalist.get (threadLocalrandom.current (). NextInt (taille)); Logger.debug ("Utilisation de données aléatoires: {}", données); }} return data; } private zookeeper connectServer () {zookeeper zk = null; try {zk = new ZooKeeper (RegistryAddress, constante.zk_session_timeout, new watcher () {@Override public void process (watchEdEvent Event) {if (event.getState () == Event.keeperstate); latch.Await (); } catch (ioException | InterruptedException e) {Logger.Error ("", e); } return zk; } private void watchode (final zookeeper zk) {try {list <string> nodelist = zk.getchildren (constante.zk_registry_path, new watcher () {@Override public void process (watchEntevent Event) {if (event.getType () == event.eventType.NodeChildRedRe }); List <string> datalist = new ArrayList <> (); pour (nœud de chaîne: nodeList) {byte [] bytes = zk.getData (constante.zk_registry_path + "/" + nœud, false, null); datalist.add (nouvelle chaîne (octets)); } Logger.debug ("données de nœud: {}", datalist); this.datalist = datalist; } Catch (gardien de base | InterruptedException e) {Logger.Error ("", e); }}}Étape 9: Implémentation de l'agent RPC
Ici, nous utilisons la technologie de proxy dynamique fournie par Java pour implémenter le proxy RPC (bien sûr, il peut également être mis en œuvre à l'aide de CGLIB). Le code spécifique est le suivant:
classe publique rpcproxy {private String ServerAddress; Les services privés sont entièrement desservis; public rpcproxy (String ServerAddress) {this.serveraddress = serverAddress; } public rpcproxy (desservissecovery desservicedcovery) {this.servicediscovery = sertvicediscovery; } @SuppressWarnings ("Unchecked") public <T> t Create (class <?> InterfaceClass) {return (t) proxy.newproxyinstance (interfaceClass.getClassloader (), nouvelle classe <?> [] {InterfaceClass}, new invocationhandler () {@Override Public Object Invory (Object Proxy, ObjectHandler () {@Override Public Invory (Object Proxy, MethodHandler () {@Override Public Invroy Args) lance le throws {rpCrequest request = new rpCrequest (); request.setParameterTypes (Methand.GetParAmterTypeS ()); Integer.Parseint (Array [1]); } else {return réponse.getResult (); }}}); }}Pour implémenter le client RPC à l'aide de la classe RPCCLIENT, il vous suffit d'étendre la classe abstraite SimpleChanNelinboundHandler fournie par Netty, le code est le suivant:
La classe publique RPCCLIENT étend SimpleChanNeLinBoundHandler <RPCResponse> {Private Static Final Logger Logger = LoggerFactory.getLogger (RPCCLIENT.class); hôte de chaîne privée; port int privé; Réponse privée RPCResponse; objet final privé obj = nouveau objet (); public rpcclient (string host, int port) {this.host = host; this.port = port; } @Override public void ChannelRead0 (ChannelHandlerContext CTX, RPCResponse Response) lève l'exception {this.Response = Response; synchronisé (obj) {obj.notifyall (); // reçoit une réponse, réveillez le thread}} @Override public void exceptioncaught (canalshandlerContext ctx, cause throws) lève une exception {logger.error ("exception de catch client", cause); ctx.close (); } public RPCResponse Send (RPCRequest Request) lève une exception {EventLoopGroup Group = new NioEventLoopGroup (); try {bootstrap bootstrap = new bootstrap (); Bootstrap.group (groupe) .Channel (Niosocketchannel.class) .Handler (new ChannelInitializer <Socketchannel> () {@Override public void Initchannel (canal socketchannel) exception {Channel.Pipeline () .Addlast (New RPCencoder (RPCrequest.Class.Class) //code la demande RPC (RPCrequest.Class)) // .AddLast (new RPCDECODER (RPCRESSONCE.CLASS) // Décode la réponse RPC (pour gérer la réponse) .Addlast (RPCCLIENT.CI); ChannelfUture Future = bootstrap.connect (hôte, port) .Sync (); Future.Channel (). WriteAndflush (demande) .Sync (); synchronisé (obj) {obj.wait (); // Aucune réponse n'a été reçue, ce qui fait attendre le thread} if (réponse! = Null) {future.Channel (). CloseFuture (). Sync (); } Retour Response; } enfin {groupe.shutdownGracely (); }}}Étape 10: Envoyez la demande de RPC
Utilisez Junit pour écrire un test unitaire en combinaison avec Spring, avec le code suivant:
@Runwith (SpringJunit4ClassRunner.class) @ContextConfiguration (Locations = "ClassPath: Spring.xml") public class HellOServiceTest {@autowired private rpcproxy rpcproxy; @Test public void hellotest () {Helloservice Helloservice = RPCProxy.create (HellOservice.class); Résultat de la chaîne = HellOservice.hello ("World"); Affirmer.AsseserTequals ("Hello! World", résultat); }}Exécutez les tests unitaires ci-dessus et si rien de inattendu ne se produit, vous devriez voir la barre verte.
Résumer
Cet article met en œuvre un cadre RPC léger via Spring + Netty + Protostuff + Zookeeper. Il utilise Spring pour fournir une injection de dépendance et une configuration de paramètres, utilise Netty pour implémenter la transmission des données NIO, utilise Protostuff pour implémenter la sérialisation des objets et utilise Zookeeper pour implémenter l'enregistrement et la découverte des services. En utilisant ce cadre, les services peuvent être déployés sur n'importe quel nœud dans un environnement distribué. Le client appelle l'implémentation spécifique du serveur via une interface distante, séparant complètement le développement du serveur et du client, fournissant une prise en charge de base pour l'implémentation d'applications distribuées à grande échelle.
Annexe: Dépendance Maven
<! - JUnit -> <dependency> <ProupId> Junit </rompuprid> <Ertifactid> JUnit </ artifactive> <version> 4.11 </ version> <ccope> Test </ccope> </Dependency> <! - Slf4j -> <Dependency> <ProupId> Org.slf4j </prounid> <ArtifActid> SLF4J-LOG4J <version> 1.7.7 </ version> </ dépendance> <! - Spring -> <dependency> <proupId> org.springFramework </proncId> <ArtifActid> Spring-Context </ artifactid> <version> 3.2.12.release </ version> </ Dependency> <Dedency> <ProupId> ORG.SPRINGFRAMEWROWNWORK </rom groupent> <lefactId> Spring-Teste </ ArtIdAcTrawork </rom groupent> <version> 3.2.12.release </preinte> <cope> test </cope> </dependency> <! - Netty -> <dependency> <proupId> io.netty </proupid> <artifactId> netty-all </retifactid> <version> 4.0.24.Final </ version> </fedency> <! - Protostuff -> <Dedency> <GroupId> com.dyuproject.protostuff </rompuprid> <letifactive> protostuff-core </ artifactid> <version> 1.0.8 </ version> </peedency> <! - ZooKeeper -> <Dedency> <proupId> org.apache.zookeeper </proupId> <pteridency> zookeeper </ artifactive> <version> 3.4.6 </ version> </ dépendance> <! - Apache Commons Collections -> <Dedency> <ProupId> org.apache.commons </proupId> <ArtefactId> Comons-Collections4 </ Arfactid> <Desentency> <proupId> </ Dependency> <! - OBJENESS -> <Dedency> <ProupId> Org.ObJensh <ArtefactId> Objenesis </ ArfactId> <DERSE> 2.1 </ Version> </Dependency> <! - CGLIB -> <Dependency> <ProupId> CGLIB </ProupID> <ArtefactId> Cglib </retifactid> <version> 3.1 </DERNIERSE> </Dependance>