RPC, o llamada de procedimiento remoto, es simplemente para decirlo: llamar a los servicios en computadoras remotas es como llamar a los servicios locales.
RPC puede basarse en el protocolo HTTP o TCP. El servicio web es un RPC basado en el protocolo HTTP. Tiene un buen rendimiento multiplataforma, pero su rendimiento no es tan bueno como RPC basado en el protocolo TCP. Dos aspectos afectarán directamente el rendimiento de RPC, uno es el método de transmisión y el otro es la serialización.
Como todos sabemos, TCP es el protocolo de capa de transporte, HTTP es el protocolo de capa de aplicación y la capa de transporte está más debajo de la capa de aplicación. En términos de transmisión de datos, la capa inferior es más rápida. Por lo tanto, en general, TCP debe ser más rápido que HTTP. En cuanto a la serialización, Java proporciona el método de serialización predeterminado, pero en el caso de alta concurrencia, este método traerá algunos cuellos de botella de rendimiento, por lo que una serie de excelentes marcos de serialización han surgido en el mercado, como: ProtoBuf, Kryo, Hessian, Jackson, etc. Pueden reemplazar la serialización predeterminada de Java para proporcionar un rendimiento más eficiente.
Para apoyar la alta concurrencia, el bloqueo tradicional IO obviamente no es adecuado, por lo que necesitamos IO asíncrono, es decir, NIO. Java proporciona soluciones NIO, y Java 7 también proporciona un mejor soporte NIO.2. Implementar NIO con Java no es algo distante, pero debemos estar familiarizados con los detalles técnicos de NIO.
Necesitamos implementar servicios en diferentes nodos en un entorno distribuido, y a través del registro de servicios, el cliente puede descubrir automáticamente los servicios disponibles actualmente y llamar a estos servicios. Esto requiere un componente de registro de servicio para registrar todas las direcciones de servicio (incluidos: nombre de host y número de puerto) en un entorno distribuido.
La relación entre la aplicación, el servicio y el registro de servicio se muestra en la figura a continuación:
Se pueden publicar varios servicios en cada servidor. Estos servicios comparten un host y un puerto. En un entorno distribuido, se proporcionará un servidor para proporcionar conjuntamente servicios. Además, para evitar un solo punto de falla del registro de servicio, debe integrarse en un entorno de clúster.
Este artículo revelará el proceso específico de desarrollar un marco RPC distribuido ligero. Este marco se basa en el protocolo TCP, proporciona características de NIO, proporciona métodos de serialización eficientes y también tiene la capacidad de registrar y descubrir servicios.
De acuerdo con los requisitos técnicos anteriores, podemos usar la siguiente selección de tecnología:
Para dependencias de Maven relacionadas, consulte el último apéndice.
Paso 1: Escriba una interfaz de servicio
interfaz pública HELLOSERVICE {String Hello (Nombre de cadena);}Coloque esta interfaz en un paquete de jar de cliente independiente para su uso.
Paso 2: Escriba la clase de implementación de la interfaz de servicio
@RPCService (HelloService.Class) // Especifique la interfaz remota de la clase pública HelloserviceImpl implementa HELLOSERVICE {@Override public String Hello (Nombre de cadena) {return "¡Hola!" + nombre; }}Use la anotación RPCService para definir la clase de implementación de la interfaz de servicio. Debe especificar una interfaz remota para la clase de implementación, porque la clase de implementación puede implementar múltiples interfaces, por lo que debe indicar el marco que es la interfaz remota.
El código RPCService es el siguiente:
@Target ({elementType.type})@retención(CretentionPolicy.runtime)@Component // Indica que puede ser escaneada por Spring public @Interface RPCService {class <?> Value ();}Esta anotación tiene las características de la anotación de componentes de Spring y puede ser escaneada por la primavera.
Esta clase de implementación se coloca en el paquete JAR del servidor, que también proporciona algunos archivos de configuración del servidor y programas de arranque para iniciar el servicio.
Paso 3: Configurar el servidor
El archivo de configuración de Servidor Spring se llama Spring.xml, y el contenido es el siguiente:
<beans ...> <context: component-scan base-package = "com.xxx.rpc.sample.server"/> <context: property-placeholeation ubicación = "classpath: config.properties"/> <!-Configurar el servicio de registro de servicio-> <bean id = "ServiceReGistry"> <constructor-barg name = "registryddress" valor = "valor =" $ {{n/{{n/{{{{{{{{{} ""/"Registry" Registry </bean> <!-Configurar el servidor RPC-> <bean id = "rpcserver"> <constructor-arg name = "serverAddress" value = "$ {server.address}"/> <constructor-arg name = "ServiceRegistry" ref = "ServiceRegistry"/> </beapo> </ Beans>Los parámetros de configuración específicos están en el archivo config.properties, y el contenido es el siguiente:
# Registro del servidor Zookeeper.Address = 127.0.0.1: 2181# RPC Server Server.address = 127.0.0.1: 8000
La configuración anterior indica que el servidor Zookeeper local está conectado y el servicio RPC se libera en el puerto 8000.
Paso 4: inicie el servidor y publique el servicio
Para cargar archivos de configuración de Spring para publicar un servicio, simplemente escriba un cargador de arranque:
public class rpcbootstrap {public static void main (string [] args) {new ClassPathXMLApPlicationContext ("Spring.xml"); }}Ejecute el método principal de la clase RPCBOotstrap para iniciar el servidor, pero hay dos componentes importantes que aún no se han implementado, a saber: ServiceRegistry y RPCServer. Los detalles de implementación específicos se darán a continuación.
Paso 5: Implementar el registro del servicio
La función de registro de servicio se puede implementar fácilmente utilizando el cliente Zookeeper. El código de ServiceRegistry es el siguiente:
clase pública ServiceRegistry {private static final logger logger = loggerFactory.getLogger (ServiceRegistry.class); privado CountdownLatch Latch = new CountdownLatch (1); Registryddress de cadena privada; Public ServiceRegistry (String RegistryAddress) {this.registryAddress = RegistryAddress; } registro público void (string data) {if (data! = null) {ZOOKEEper zk = ConnectServer (); if (zk! = null) {createnode (zk, data); }}} private ZOOKEEper ConnectServer () {ZOOKEEPER ZK = NULL; Pruebe {zk = new Zookeeper (RegistryAdDress, constant.zk_session_timeout, new Watcher () {@Override public void Process (WatchedEvent Event) {if (event.getState () == Event.keeperstate.syncconnected) {Latch.Countdown ();}}}); Latch.Await (); } catch (ioException | InterruptedException e) {logger.error ("", e); } return zk; } private void createNode (ZOOKEEper Zk, data de cadena) {try {byte [] bytes = data.getBytes (); String ruta = zk.create (constant.zk_data_path, bytes, zoonefs.ids.open_acl_unsafe, createMode.ephemeral_sequential); Logger.debug ("Crear nodo Zookeeper ({} => {})", ruta, datos); } catch (KeeperException | InterruptedException e) {logger.error ("", e); }}}Entre ellas, todas las constantes se configuran a través de constante:
interfaz pública constante {int zk_session_timeout = 5000; Cadena zk_registry_path = "/registro"; Cadena zk_data_path = zk_registry_path + "/data";}Nota: Primero, debe utilizar la línea de comandos del cliente ZOOKEEper para crear/registrar nodos permanentes para almacenar todos los nodos temporales de servicio.
Paso 6: Implemente el servidor RPC
El uso de Netty puede implementar un servidor RPC que admite NIO. Debe utilizar ServiceRegistry para registrar la dirección de servicio. El código RPCServer es el siguiente:
Public Class RPCServer implementa ApplicationContextAware, InitializingBean {private static final logger = loggerFactory.getLogger (rpcserver.class); String String Private ServerAddress; Servicio de administración privada Servicreegistro; mapa privado <string, object> handlermap = new HashMap <> (); // Almacene la relación de mapeo entre el nombre de la interfaz y el objeto de servicio public rpcserver (String ServerAddress) {this.serverAddress = serverAddress; } public rpcServer (String ServerAddress, ServiceRegistry ServiceRegistry) {this.serverAddress = serverAddress; this.serviceRegistry = ServiceRegistry; } @Override public void setApplicationContext (ApplicationContext CTX) lanza Beansexception {MAP <String, Object> ServiceBeanMap = CTX.GetBeAnswitHannotation (rpcservice.class); // Obtenga todos los resortes con anotaciones RPCService Bean if (mapUtils.isNotEmpty (ServiceBeanMap)) {for (Object ServiceBean: ServiceBeanMap.Values ()) {String interfaceName = ServiceBean.getClass (). handlermap.put (InterfaCename, ServiceBean); }}} @Override public void AfterPropertIesset () lanza la excepción {EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup WorkerGroup = new NioEventLoopGroup (); intente {ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group (BossGroup, WorkerGroup) .Channel (nioserversocketchannel.class) .childHandler (nuevo canalinitializer <Socketchannel> () {@Override public void initchannel (socketchannel canal) Excepción {canal.pipeline () .addlast (new rpcDecoder (rpCeCheStems). Solicitud RPC (para manejar la solicitud) .AddLast (nuevo RPCENCODER (RPCRESPONSE.Class)) // codifica la respuesta RPC (para devolver la respuesta) .AddLast (new rpchandler (handlermap)); verdadero); String [] array = serverAddress.split (":"); String host = array [0]; int port = Integer.ParseInt (Array [1]); Channelfuture futuro = bootstrap.bind (host, puerto) .sync (); Logger.debug ("Servidor iniciado en el puerto {}", puerto); if (ServiceRegistry! = NULL) {ServiceRegistry.Register (ServerAddress); // registrar la dirección de servicio} futuro.channel (). CloseFuture (). Sync (); } Finalmente {WorkerGroup.shutdownGRacefly (); bossgroup.shutdowngracefly (); }}}En el código anterior, hay dos Pojos importantes que deben describirse, a saber, RPCRequest y RPCResponse.
Use RPCRequest para encapsular las solicitudes de RPC, el código es el siguiente:
public class rpCrequest {private String requestId; Cadena privada ClassName; Método de cadena privada; clase privada <?> [] Parametertypes; parámetros de objeto privado []; // getter/setter ...}Use RPCResponse para encapsular la respuesta RPC, el código es el siguiente:
public class rpCResponse {private String requestId; Error de lanzamiento privado; resultado del objeto privado; // getter/setter ...}Use RPCDECoder para proporcionar una decodificación de RPC, solo extienda el método de decodificación de clase abstracta de ByTetomessageCoder de Netty, el código es el siguiente:
Public Class RPCDECoder extiende ByTetomessageCoder {Clase privado <?> GenericClass; public rpcDecoder (class <?> GenericClass) {this.genericClass = GenericClass; } @Override public void Decode (ChannelHandLerContext CTX, ByteBuf en, List <S Object> out) lanza la excepción {if (in.ReadableBytes () <4) {return; } in.markeaderIndex (); int dataLength = in.readInt (); if (dataLength <0) {ctx.close (); } if (in.readableBytes () <dataLength) {in.resetreaderIndex (); devolver; } byte [] data = new Byte [DataLength]; in.ReadBytes (datos); Obj obj = SerializationUtil.deserialize (datos, genicClass); out.Add (obj); }}Use RPCENCODER para proporcionar una codificación RPC, solo extiende el método de codificación de la clase abstracto MessageTobyteCoder de Netty, el código es el siguiente:
Public Class RPCencoder extiende MessageTobyteCoder {private clase <?> GenericClass; public rpcencoder (class <?> GenericClass) {this.genericClass = GenericClass; } @Override public void encode (ChannelHandLerContext CTX, Object in, ByteBuf Out) lanza la excepción {if (genicClass.isInstance (in)) {byte [] data = serializationUtil.serialize (in); out.writeInt (data.length); out.writytes (datos); }}}Escriba una clase de herramientas SerializationUtil y use ProTostuff para implementar la serialización:
clase pública SerializationUtil {Mapa estático privado <class <?>, esquema <? >> Cachedschema = new ConcurrenthashMap <> (); Objenais estática privada objenais = new ObjenesisStd (verdadero); Private SerializationUtil () {} @supresswarnings ("no verificado") estática privada <t> esquema <t> getschema (class <t> cls) {esquema <t> esquema = (esquema <t>) cachedschema.get (cls); if (schema == null) {schema = runTimeschema.createFrom (CLS); if (schema! = null) {cachedschema.put (cls, esquema); }} esquema de retorno; } @Suppleswarnings ("sin verificar") public static <t> byte [] serialize (t obj) {class <t> cls = (class <t>) obj.getClass (); LinkedBuffer Buffer = LinkedBuffer.allocate (LinkedBuffer.default_buffer_size); intente {Schema <T> Schema = GetSchema (CLS); return ProtoStuffioutil.TobyTearray (obj, esquema, amortiguador); } Catch (Exception e) {Throw New IlegalStateException (e.getMessage (), e); } finalmente {buffer.clear (); }} public static <t> t deserialize (byte [] data, class <t> cls) {try {t mensaje = (t) objenosis.newinstance (CLS); Esquema <t> esquema = GetSchema (CLS); Protostuffioutil.mergeFrom (datos, mensaje, esquema); Mensaje de devolución; } Catch (Exception e) {Throw New IlegalStateException (e.getMessage (), e); }}}Lo anterior usa objenesis para instanciar objetos, que es más poderoso que la reflexión de Java.
Nota: Si necesita reemplazar otros marcos de serialización, simplemente modifique el SerializationUtil. Por supuesto, una mejor manera de implementarlo es proporcionar elementos de configuración para decidir qué método de serialización usar.
Para manejar las solicitudes de RPC en RPCHANDLER, solo necesita extender la clase abstracta de SimpleChannelinBoundHandler de Netty, el código es el siguiente:
Public Class RpChandler extiende SimpleChannelInboundHandler <rpCrequest> {private static final logger logger = loggerFactory.getLogger (rpchandler.class); Mapa final privado <String, Object> HandLermap; public rpchandler (map <string, object> handlermap) {this.handlermap = handlermap; } @Override public void Channelead0 (final ChannelHandlerContext CTX, RPCRequest Solicitud) lanza la excepción {RPCRESPONDE respuesta = new rpCResponse (); Response.setRequestid (request.getRequestid ()); intente {objeto resultado = handle (request); respuesta.setResult (resultado); } catch (showable t) {respuesta.setError (t); } ctx.writeandflush (respuesta) .AddListener (ChannelFutureListener.close); } Mango de objeto privado (solicitud RPCREQUEST) lanza lando {string className = request.getClassName (); Object ServiceBean = handlermap.get (className); Clase <?> ServiceClass = ServiceBean.getClass (); String MethodName = request.getMethodName (); Clase <?> [] Parametertypes = request.getParametertypes (); Objeto [] parámetros = request.getParameter (); /*Método método = serviceClass.getMethod (MethodName, ParametertyPes); método.setAccessible (verdadero); Method de retorno FastMethod ServiceFastMethod = ServiceFastClass.getMethod (MethodName, ParametertyPes); return ServiceFastMethod.invoke (ServiceBean, Parámetros); } @Override public void ExceptionCaught (ChannelHandlerContext CTX, Throwable Causa) {logger.error ("Excepción de captura del servidor", causa); ctx.close (); }}Para evitar los problemas de rendimiento causados por el uso de la reflexión de Java, podemos usar la API de reflexión proporcionada por CGLIB, como FastClass y FastMethod utilizados anteriormente.
Paso 7: Configure el cliente
También use archivos de configuración de Spring para configurar el cliente RPC. El código Spring.xml es el siguiente:
<frijoles ...> <context: Property-placeholder ubicación = "classpath: config.properties"/> <!-Configurar componente de descubrimiento de servicio-> <bean id = "ServiceScovery"> <Constructor-arg name = "RegistryAddress" Value = "$ {Registry.addddress}"/> </bean> <! <constructor-arg name = "ServiceScovery" REF = "ServiceDesscovery"/> </bean> </beans>config.properties proporciona una configuración específica:
# Registro del servidor Zookeeper.address = 127.0.0.1: 2181
Paso 8: Implementar el descubrimiento de servicios
Utilice también Zookeeper para implementar la función de descubrimiento de servicios, consulte el siguiente código:
Class Public ServiceScovery {private static final logger logger = loggerFactory.getLogger (ServiceCeScovery.class); privado CountdownLatch Latch = new CountdownLatch (1); Lista volátil privada <String> DataList = new ArrayList <> (); Registryddress de cadena privada; Public ServiceScovery (String RegistryAddress) {this.RegistryAddress = RegistryAddress; ZOOKEEPER ZK = ConnectServer (); if (zk! = null) {watchNode (zk); }} public String String Discover () {String data = null; int size = dataList.size (); if (size> 0) {if (size == 1) {data = dataList.get (0); Logger.debug ("Usando solo datos: {}", datos); } else {data = dataList.get (ThreadLocalRandom.Current (). NextInt (size)); Logger.debug ("Usando datos aleatorios: {}", datos); }} Datos de retorno; } private ZOOKEEper ConnectServer () {ZOOKEEPER ZK = NULL; Pruebe {zk = new Zookeeper (RegistryAdDress, constant.zk_session_timeout, new Watcher () {@Override public void Process (WatchedEvent Event) {if (event.getState () == Event.keeperstate.syncconnected) {Latch.CountDown ();}}); Latch.Await (); } catch (ioException | InterruptedException e) {logger.error ("", e); } return zk; } private void watchNode (final ZOOKEEPER ZK) {try {list <string> nodElist = zk.getChildren (constant.zk_registry_path, new Watcher () {@Override public void Process (WatchedEvent Event) {if (Event.Gettype () == Event.eventType.nodechilDenDhenChed) {WatchEvent Event) }}); List <String> DataList = new ArrayList <> (); for (nodo de cadena: nodelist) {byte [] bytes = zk.getData (constant.zk_registry_path + "/" + nodo, falso, null); dataList.Add (nueva cadena (bytes)); } Logger.debug ("Datos de nodo: {}", dataList); this.datalist = dataList; } catch (KeeperException | InterruptedException e) {logger.error ("", e); }}}Paso 9: Implementación del agente RPC
Aquí utilizamos la tecnología dinámica proxy proporcionada por Java para implementar el proxy RPC (por supuesto, también se puede implementar utilizando CGLIB). El código específico es el siguiente:
public class rpcproxy {private String ServerAddress; Servicio de servicio privado Servicio de servicios; public rpcproxy (String ServerAddress) {this.serverAddress = serverAddress; } public rpcproxy (ServiceCovery ServiceScovery) {this.serviceScovery = ServiceScovery; } @Suppleswarnings ("no verchado") public <t> t create (class <?> InterfaceClass) {return (t) proxy.newproxyInstance (interfaceClass.getClassLoader (), nueva clase <?> [] {InterfacecLass}, nueva invocatoria () { @ @ @ @ @ @ @ @@Override Public Invoke (NEW CLASS Args) lanza {RPCREQUEST SOLIT = new RPCRequest (); request.setParametTypes (Method.getParametTypes ()); Integer.ParseInt (Array [1]); } else {return respuesta.getResult (); }}}); }}Para implementar el cliente RPC utilizando la clase RPCClient, solo necesita extender la clase de resumen SimpleChannelInBoundHandler proporcionada por Netty, el código es el siguiente:
Public Class RPCClient extiende SimpleChannelInBoundHandler <rpcResponse> {private static final logger = loggerFactory.getLogger (rpccclient.class); host de cadena privada; Puerto privado int; Respuesta privada de RPCResponse; objador final privado obj = nuevo objeto (); public rpcClient (string host, int puerto) {this.host = host; this.port = puerto; } @Override public void Channelead0 (ChannelHandlerContext CTX, RPCResponse Respuesta) lanza la excepción {this.Response = Response; sincronizado (obj) {obj.notifyall (); // recibir una respuesta, despertar el hilo}} @Override public void ExceptionCaught (ChannelHandLerContext CTX, Throwable Cause) lanza la excepción {logger.error ("Excepción de captura del cliente", causa); ctx.close (); } public rpCResponse send (solicitud RPCRequest) lanza la excepción {EventLoopGroup Group = new NioeventLoopGroup (); intente {bootstrap bootstrap = new Bootstrap (); bootstrap.group (grupo) .channel (niosocketchannel.class) .handler (nuevo canalInitializer <Socketchannel> () {@Override public void initchannel (socketchannel canal) Exception {canal.pipeline () .addLast (nuevo rpcencoder (rppCrequest.class) // enchode the ret the slety (settel (new rpCiCoder) .AddLast (nuevo RPCDECODER (rpcResponse.class)) // decodifica la respuesta RPC (para manejar la respuesta) .addlast (rpcclient.this) Channelfuture futuro = bootstrap.connect (host, puerto) .sync (); futuro.channel (). WRITEEANDFLUSH (solicitud) .sync (); sincronizado (obj) {obj.wait (); // no se recibió respuesta, lo que hace que el hilo espere} if (respuesta! = Null) {futuro.channel (). CloseFuture (). Sync (); } Respuesta de retorno; } finalmente {group.shutDownGRacefly (); }}}Paso 10: Enviar solicitud RPC
Use Junit para escribir una prueba unitaria en combinación con Spring, con el siguiente código:
@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (ubicaciones = "classpath: spring.xml") public class HelloServiceTetest {@aUtowired RPCProxy rpcProxy; @Test public void hellotest () {HelloService HelloService = rpcproxy.create (HelloService.class); Result de cadena = helloService.hello ("mundo"); Afirmar.assertecals ("¡Hola! Mundo", resultado); }}Ejecute las pruebas unitarias anteriores y si no sucede nada inesperado, debería ver la barra verde.
Resumir
Este artículo implementa un marco RPC ligero a través de Spring + Netty + ProTostuff + Zookeeper. Utiliza Spring para proporcionar inyección de dependencia y configuración de parámetros, utiliza Netty para implementar la transmisión de datos de NIO, utiliza Protostuff para implementar la serialización de objetos y utiliza Zookeeper para implementar el registro y el descubrimiento del servicio. Usando este marco, los servicios se pueden implementar en cualquier nodo en un entorno distribuido. El cliente llama a la implementación específica del servidor a través de una interfaz remota, separando por completo el desarrollo del servidor y el cliente, proporcionando soporte básico para la implementación de aplicaciones distribuidas a gran escala.
Apéndice: dependencia de Maven
< <versión> 1.7.7 </ververy> </pepertency> <!-spring-> <epardency> <mupoupid> org.springframework </proupid> <artifactid> spring-context </artifactid> <versión> 3.2.12.release </versión> </dependency> <epartency> <proupid> org.springframework <//groupid> <spendency> spring <tiMid> spring <tiMid> springract </artest </artest </art. <versión> 3.2.12.release </versewer> <cope> test </scope> </pendency> <!-Netty-> <pendency> <MoupRoD> io.netty </groupid> <artifactid> netty-all </artifactId> <versión> 4.0.24.final </lelpersion> </dependency> <!-Protostuff-> <pendency> <MoupRid> com.dyuproject.proTostuff </groupid> <artifactid> Protostuff-core </artifactid> <verserse> 1.0.8 </versewers> </pendency> <!-Zookeepers-> <PERSEDENCY> <MOUTMOUD> ORG.APACHE.ZOOKEEPER </Groupid> <Artifactid> ZOOKEEPER </artifactid> <Versión> 3.4.6 </versión> </pendency> <!-Apache Commons Collections-> <Spendency> <MoupRid> org.apache.commons </groupid> <artifactid> commons-colections4 </arfactid> <versión> 4.0 </versión> </dependencia> <!-objénesis-> <pendency> <proupid> org.objenesish> <artifactid> objenosis </arfactid> <verserse> 2.1 </verversion> </pendency> <!-cglib-> <pendency> <proupid> cglib </proupid> <artifactid> cglib </artifactid> <versión> 3.1 </sersion> </pendency>>