RPC, ou chamada de procedimento remoto, é apenas para simplificar: chamar serviços em computadores remotos é como chamar serviços locais.
O RPC pode ser baseado no protocolo HTTP ou TCP. O serviço da Web é um RPC baseado no protocolo HTTP. Ele tem um bom desempenho de plataforma cruzada, mas seu desempenho não é tão bom quanto o RPC baseado no protocolo TCP. Dois aspectos afetarão diretamente o desempenho do RPC, um é o método de transmissão e o outro é a serialização.
Como todos sabemos, o TCP é o protocolo da camada de transporte, o HTTP é o protocolo da camada de aplicação e a camada de transporte está mais abaixo da camada de aplicação. Em termos de transmissão de dados, a camada inferior é mais rápida. Portanto, em geral, o TCP deve ser mais rápido que o HTTP. As for serialization, Java provides the default serialization method, but in the case of high concurrency, this method will bring some performance bottlenecks, so a series of excellent serialization frameworks have emerged on the market, such as: Protobuf, Kryo, Hessian, Jackson, etc. They can replace Java's default serialization to provide more efficient performance.
Para apoiar a alta concorrência, o bloqueio tradicional de IO obviamente não é adequado, por isso precisamos de IO assíncrono, ou seja, Nio. O Java fornece soluções de NIO e o Java 7 também fornece melhor suporte ao NIO.2. A implementação do NIO com Java não é uma coisa distante, mas precisamos estar familiarizados com os detalhes técnicos da NIO.
Precisamos implantar serviços em diferentes nós em um ambiente distribuído e, por meio do registro de serviços, o cliente pode descobrir automaticamente os serviços atualmente disponíveis e chamar esses serviços. Isso requer um componente do Registro de Serviço para registrar todos os endereços de serviço (incluindo: nome do host e número da porta) em um ambiente distribuído.
A relação entre o aplicativo, o serviço e o registro de serviço é mostrada na figura abaixo:
Vários serviços podem ser publicados em cada servidor. Esses serviços compartilham um host e porto. Em um ambiente distribuído, o servidor será fornecido para fornecer serviços em conjunto. Além disso, para evitar um único ponto de falha do registro de serviço, ele precisa ser incorporado em um ambiente de cluster.
Este artigo revelará o processo específico de desenvolvimento de uma estrutura RPC distribuída leve. Essa estrutura é baseada no protocolo TCP, fornece recursos do NIO, fornece métodos de serialização eficientes e também tem a capacidade de registrar e descobrir serviços.
De acordo com os requisitos técnicos acima, podemos usar a seguinte seleção de tecnologia:
Para dependências relacionadas ao Maven, consulte o último apêndice.
Etapa 1: Escreva uma interface de serviço
interface pública Heloservice {String hello (nome da string);}Coloque esta interface em um pacote de jar com cliente independente para uso.
Etapa 2: Escreva a classe de implementação da interface de serviço
@Rpcservice (helloservice.class) // Especifique a interface remota public classe HelloserviceImpl implementa Heloservice {@Override public String Hello (Nome da String) {Retorno "Hello!" + nome; }}Use a anotação RPCService para definir a classe de implementação da interface de serviço. Você precisa especificar uma interface remota para a classe de implementação, porque a classe de implementação pode implementar várias interfaces; portanto, você deve informar à estrutura que é a interface remota.
O código RPCService é o seguinte:
@Target ({elementType.type})@retention(retEntionPolicy.runtime)@component // indica que pode ser digitalizado pelo público da primavera @Interface rpcService {class <?> Value ();}Esta anotação tem as características da anotação do componente da primavera e pode ser digitalizada na primavera.
Esta classe de implementação é colocada no pacote JAR do servidor, que também fornece alguns arquivos de configuração do servidor e programas de bootstrap para iniciar o serviço.
Etapa 3: configure o servidor
O arquivo de configuração de mola do servidor é nomeado Spring.xml, e o conteúdo é o seguinte:
<Beans ...> <Contexto: Componente-Scan Base-Package = "com.xxx.rpc.sample.server"/> <Contexto: Property-Placterholder Location = "ClassPath: config.properties"/> <!-Configure Service Registration Component-> <Bean ID = "ServiceRgistry"> value = "$ {Registry.address}"/> </ Bean> <!-Configure o servidor RPC-> <bean id = "rpcServer"> <construtor-arg name = "serverAddress" value = "$ {server.address}/> <Constructor/Areg Name" <ServiceRistry "REFRIMENTO" REFRIMENTO "REFRIMENTO">Os parâmetros de configuração específicos estão no arquivo config.properties, e o conteúdo é o seguinte:
# Zookeeper Server Registry.address = 127.0.0.1: 2181# RPC Server Server.address = 127.0.0.1: 8000
A configuração acima indica que o servidor Zookeeper local está conectado e o serviço RPC é liberado na porta 8000.
Etapa 4: inicie o servidor e publique o serviço
Para carregar arquivos de configuração de mola para publicar um serviço, basta escrever um carregador de inicialização:
classe pública rpcbootstrap {public static void main (string [] args) {new ClassPathXMLApplicationContext ("spring.xml"); }}Execute o método principal da classe RPCBootStrap para iniciar o servidor, mas existem dois componentes importantes que ainda não foram implementados, a saber: Serviceregistry e RPCServer. Os detalhes específicos da implementação serão fornecidos abaixo.
Etapa 5: Implementar registro de serviço
A função de registro de serviço pode ser facilmente implementada usando o cliente Zookeeper. O Código do Serviço é o seguinte:
classe pública serviceregistry {private static final logger logger = loggerFactory.getLogger (serviceregistry.class); Countdownlatch privado Latch = new Countdownlatch (1); RegistryDres de String Private; public serviceregistry (registryAddress) {this.registryAddress = RegistryAddress; } public void Register (String Data) {if (data! = null) {Zookeeper zk = ConnectServer (); if (zk! = null) {createNode (zk, dados); }}} private zookeeper ConnectServer () {Zookeeper zk = null; tente {zk = new Zookeeper (RegistryAddress, constant.zk_session_timeout, new Watcher () {@Override public void Process (Event WatchEDEvent) {if (event.getState () == event.KeePerstate.syncConected) {latch.countDown ();}} latch.await (); } catch (ioexception | interruptedException e) {logger.error ("", e); } retornar zk; } private void CreateNode (Zookeeper ZK, String Data) {try {byte [] bytes = data.getBytes (); String Path = zk.create (constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, cretemode.ephemeral_sequencial); Logger.debug ("Criar nó Zookeeper ({} => {})", caminho, dados); } catch (keeterexception | interruptedException e) {logger.error ("", e); }}}Entre eles, todas as constantes são configuradas por constante:
interface pública constante {int zk_session_timeout = 5000; String zk_registry_path = "/Registry"; String zk_data_path = zk_registry_path + "/data";}Nota: Primeiro, você precisa usar a linha de comando do cliente Zookeeper para criar/registrar nós permanentes para armazenar todos os nós temporários de serviço.
Etapa 6: implemente o servidor RPC
O uso da Netty pode implementar um servidor RPC que suporta o NIO. Você precisa usar o Serviceregistry para registrar o endereço de serviço. O código RPCServer é o seguinte:
classe pública rpcServer implementa ApplicationContexTAWare, InitializandoBean {private estático Final Logger = LoggerFactory.getLogger (rpcserver.class); Private String ServerAddress; Serviço privado Serviceregistry; mapa privado <string, objeto> handlermap = new hashmap <> (); // Armazene a relação de mapeamento entre o nome da interface e o objeto de serviço public rpcServer (String ServerAddress) {this.serveraddress = serverAddress; } public rpcServer (string serverAddress, serviceregistry serviceregistry) {this.serveraddress = serverAddress; this.Serviceregistry = Serviceregistry; } @Override public void setApplicationContext (ApplicationContext ctx) lança beansexception {map <string, object> serviceBeanMap = ctx.getBeanswithannotation (rpcservice.class); // Obtenha todas as molas com as anotações de RPCService Bean if (maputils.isnotEmpty (ServiceBeanMap)) {for (object ServiceBean: ServiceBeanMap.Values ()) {String interfacename = ServiceBean.getClass (). handlermap.put (interfacename, serviceBean); }}} @Override public void depoisPropertiESSET () lança Exceção {Eventloopgroup BossGroup = new NioEventloopgroup (); Eventloopgroup WorkerGroup = new NioEventloopgroup (); tente {serverbootstrap bootstrap = new ServerBootStrap (); bootstrap.group (BossGroup, Workergroup) .Channel (nioserversocketchannel.class) .childHandler (new ChannelInitializer <SocketchAnnel> () {@Override Public void initchannel (canal socketchannel) Excepção {Channel.piPeline () .AddlasT (SocketchAnel) Solicitação RPC (para manipular a solicitação) .Adlast (novo rpcencoder (rpcResponse.class)) // codifica a resposta RPC (para retornar a resposta) .addlast (new rpchandler (handlemap); String [] Array = ServerAddress.split (":"); String host = matriz [0]; int porta = integer.parseint (matriz [1]); Channelfuture futuro = bootstrap.bind (host, porta) .sync (); Logger.debug ("Servidor iniciado na porta {}", porta); if (serviceregistry! = null) {serviceregistry.register (serverAddress); // Endereço de serviço de registro} futuro.Channel (). CloseFuture (). Sync (); } finalmente {workergroup.shutdownGracely (); bossgroup.shutdownGracely (); }}}No código acima, existem dois pojos importantes que precisam ser descritos, a saber, RPCRequest e RPcResponse.
Use RPCRequest para encapsular solicitações de RPC, o código é o seguinte:
classe pública rpcrequest {private string requestId; String private String ClassName; MethodName de String Private; classe privada <?> [] parameterTypes; Objeto privado [] parâmetros; // getter/setter ...}Use RPcResponse para encapsular a resposta do RPC, o código é o seguinte:
classe pública rpcResponse {private string requestId; erro de lançamento privado; resultado de objeto privado; // getter/setter ...}Use o RPCDecoder para fornecer decodificação de RPC, basta estender o método de decodificação de classe ByTetomessAgedEcoder da Netty, o código é o seguinte:
classe pública rpcdecoder estende o bytetomessagedecoder {classe privada <?> genérico; public rpcDecoder (classe <?> genericClass) {this.genericClass = genericClass; } @Override public void decode (ChannelHandlerContext CTX, BYTEBUF in, List <Becut> out) lança exceção {if (in.ReablelableBytes () <4) {return; } in.markReadIndex (); int datalength = in.readInT (); if (datalength <0) {ctx.close (); } if (in.ReAtableBytes () <DataLength) {in.ResetReadeRIndex (); retornar; } byte [] dados = novo byte [datallength]; in.readbytes (dados); Objeto obj = serializationUtil.DeseRialize (dados, genericClass); out.add (obj); }}Use o RPCEncoder para fornecer a codificação RPC, basta estender o método de codificação da classe MessageTobyteEncoder da Netty, o código é o seguinte:
classe pública rpcencoder estende o messagetobyteEncoder {classe privada <?> genericclass; public rpcencoder (classe <?> genericClass) {this.genericClass = genericClass; } @Override public void Encode (ChannelHandlerContext ctx, objeto in, bytebuf out) lança exceção {if (genericclass.isInsinStance (in)) {byte [] data = serializationutil.Serialize (in); out.WriteInt (data.length); out.WriteBytes (dados); }}}Escreva uma classe de ferramentas de serialização e use o Protostuff para implementar a serialização:
classe pública serializationUtil {private estático mapa <classe <?>, esquema <? >> cachedschema = new concursonthashmap <> (); Objena estática privada objenesis = novo objenesestd (true); Serialização privada () {} @suppresswarnings ("desmarcada") estática privada <t> esquema <t> getschema (classe <t> cls) {schema <t> schema = (esquema <t>) cachedschema.get (cls); if (schema == null) {schema = runTimeschema.createfrom (cls); if (schema! = null) {cachedschema.put (cls, esquema); }} retornar esquema; } @Suppresswarnings ("desmarcado") public static <t> byte [] serialize (t obj) {class <t> cls = (classe <T>) obj.getClass (); Buffer do LinkedBuffer = LinkEdBuffer.Allocate (LinkedBuffer.Default_Buffer_Size); tente {schema <t> esquema = getschema (cls); Retornar protostuffioutil.tobytearray (obj, esquema, buffer); } catch (Exceção e) {lança new IllegalStateException (e.getMessage (), e); } finalmente {buffer.clear (); }} public static <t> t Deserialize (byte [] dados, classe <T> cls) {try {t mensagem = (t) objenese.newInstance (cls); Esquema <t> esquema = getschema (cls); Protostuffioutil.mergefrom (dados, mensagem, esquema); devolver mensagem; } catch (Exceção e) {lança new IllegalStateException (e.getMessage (), e); }}}O exposto acima usa a objesese para instanciar objetos, que são mais poderosos que a reflexão de Java.
NOTA: Se você precisar substituir outras estruturas de serialização, basta modificar o serialização. Obviamente, uma maneira melhor de implementá -lo é fornecer itens de configuração para decidir qual método de serialização para usar.
Para lidar com solicitações de RPC em RPchandler, você só precisa estender a classe abstrata do SIMNECHANNELINBOUNDLER da Netty, o código é o seguinte:
classe pública rpchandler estende o SimpleChannelInBoundHandler <RPCRequest> {private Static Final Logger Logger = LoggerFactory.getLogger (rpchandler.class); mapa final privado <string, objeto> handlermap; public rpchandler (map <string, object> handlermap) {this.handlerMap = handlermap; } @Override public void ChannelRead0 (Final ChannelHandlerContext CTX, solicitação RPCRequest) lança exceção {RPcResponse Response = new RPcResponse (); Response.setRequestId (request.getRequestId ()); tente {resultado do objeto = handle (request); Response.setResult (resultado); } catch (throwable t) {Response.Sterror (t); } ctx.writeAndflush (resposta) .addlistener (ChannelFutureListener.close); } identificador de objeto privado (solicitação RPCRequest) lança arremesso {string className = request.getClassName (); Objeto serviceBean = handlermap.get (className); Classe <?> ServiceClass = serviceBean.getClass (); String MethodName = request.getMethodName (); Classe <?> [] ParameterTypes = request.getParameterTypes (); Objeto [] parâmetros = request.getParameters (); /*Método Método = ServiceClass.getMethod (MethodName, ParameterTypes); Method.setAccessible (true); Return Method.inVoke (ServiceBean, Parâmetros);*/ FastClass ServiceFastClass = FastClass.Create (ServiceClass); FastMethod ServiceFastMethod = ServiceFastClass.getMethod (MethodName, ParameterTypes); return ServiceFastMethod.inVoke (ServiceBean, Parâmetros); } @Override public void ExceptionCaught (ChannelHandlerContext ctx, causa de arremesso) {Logger.error ("Exceção de captura do servidor", causa); ctx.close (); }}Para evitar os problemas de desempenho causados pelo uso da reflexão do Java, podemos usar a API de reflexão fornecida pelo CGLIB, como FastClass e FastMethod usada acima.
Etapa 7: configure o cliente
Use também os arquivos de configuração do Spring para configurar o cliente RPC. O código spring.xml é o seguinte:
<Beans ...> <Contexto: Propriedade-Place-tholder Location = "ClassPath: config.properties"/> <!-Configure componente de descoberta de serviço-> <bean id = "ServiceCovery"> <construtor-arg name = "RegistryAddress" id = "rpcProxy"> <construtor-arg name = "ServiceDiscovery" ref = "ServiceScobery"/> </sheans> </sheans>
Config.Properties fornece configuração específica:
# Zookeeper Server Registry.address = 127.0.0.1: 2181
Etapa 8: Implementar a descoberta de serviços
Use também o Zookeeper para implementar a função de descoberta de serviços, consulte o seguinte código:
public class Servicediscovery {private Static Final Logger Logger = LoggerFactory.getLogger (ServiceCovery.class); Countdownlatch privado Latch = new Countdownlatch (1); Lista volátil privada <String> Datalist = new ArrayList <> (); RegistryDres de String Private; Public ServiceScobery (String RegistryAddress) {this.RegistryAddress = RegistryAddress; Zookeeper zk = ConnectServer (); if (zk! = null) {watchNode (zk); }} public String Discover () {String data = null; int size = datalist.size (); if (size> 0) {if (size == 1) {data = datalist.get (0); Logger.debug ("Usando apenas dados: {}", dados); } else {data = datalist.get (threadlocalrandom.current (). nextInt (size)); Logger.debug ("Usando dados aleatórios: {}", dados); }} retornar dados; } private zookeeper ConnectServer () {Zookeeper zk = null; tente {zk = new Zookeeper (RegistryAddress, constant.zk_session_timeout, new Watcher () {@Override public void Process (Event WatchEDEvent) {if (event.getState () == Event.KeePerstate.SyncConected) {latch.countDown ();}}; latch.await (); } catch (ioexception | interruptedException e) {logger.error ("", e); } retornar zk; } private void watchNode (zookeeper final ZK) {try {list <string> nodelist = zk.getChildren (constant.zk_registry_path, new watcher () {@Override Public Void Process (WatchEdEvent Event) {if (event.gettype () == Event.EventTyPeRy.NodEnChild); }); List <String> Datalist = new ArrayList <> (); para (nó da string: nodelist) {byte [] bytes = zk.getdata (constant.zk_registry_path + "/" + node, false, null); datalist.add (new string (bytes)); } Logger.debug ("dados do nó: {}", datalist); this.Datalist = Datalist; } catch (keeterexception | interruptedException e) {logger.error ("", e); }}}Etapa 9: Implementando o agente RPC
Aqui, usamos a tecnologia dinâmica de proxy fornecida pela Java para implementar o proxy RPC (é claro, ela também pode ser implementada usando o CGLIB). O código específico é o seguinte:
classe pública rpcproxy {private string serverAddress; Descoberta de serviço de serviço privado; public RPCProxy (String ServerAddress) {this.Serveraddress = ServerAddress; } public rpcProxy (Serviço de divisão de serviço) {this.Servicediscovery = Servicediscovery; } @Suppresswarnings ("desmarcados") public <T> t cria (classe <?> Interfaceclass) {return (t) proxy.newproxyInstance (interfaceclass.getClassLoader (), nova classe <? args) lança o arremesso {RPCRequest Solicy = new RPCRequest (); request.setParameterTypes (Method.getParameterTypes (); Inteiro.parsent (Array [1]); } else {return Response.getResult (); }}}); }}Para implementar o cliente RPC usando a classe RPCClient, você só precisa estender a classe abstrata SimplechannelInBoundHandler fornecida pela Netty, o código é o seguinte:
classe pública rpcclient estende o SimpleChannelInBoundHandler <RPcResponse> {private estático final logger) = LoggerFactory.getLogger (rpcclient.class); host privado string; private int porta; Resposta privada de RPcResponse; objeto final privado obj = new Object (); public rpcClient (host da string, int porta) {this.host = host; this.port = porta; } @Override public void ChannelRead0 (ChannelHandlerContext CTX, resposta rpcResponse) lança a exceção {this.Response = Response; sincronizado (obj) {obj.NotifyAll (); // Receba uma resposta, acorde o thread}} @Override public void ExceptionCaught (ChannelHandlerContext ctx, causa de arremesso) lança exceção {Logger.error ("Exceção de captura do cliente", causa); ctx.close (); } public rpcResponse envia (solicitação rpcrequest) lança exceção {Eventloopgroup Group = new nioEventloopgroup (); tente {bootstrap bootstrap = new bootstrap (); bootstrap.group (grupo) .channel (niosocketchannel.class) .Handler (novo canalinitializer <CocketchAnnel> () {@Override public void initchannel (canal socketchannel) lança a excepção {Channel.piPeline () .addlast (new RpcEncerp (rpcEnCoMP). .Addlast (novo rpcdecoder (rpcResponse.class)) // decodifique a resposta RPC (para lidar com a resposta) .addlast (rpcclient.This); Channelfuture Future = bootstrap.connect (host, porta) .sync (); futuro.channel (). writeAndflush (solicitação) .sync (); sincronizado (obj) {obj.wait (); // Nenhuma resposta foi recebida, fazendo com que o thread aguarde} if (resposta! = Null) {future.channel (). CloseFuture (). Sync (); } resposta de retorno; } finalmente {group.shutdownGracely (); }}}Etapa 10: Envie a solicitação RPC
Use Junit para escrever um teste de unidade em combinação com a primavera, com o seguinte código:
@Runwith (springjunit4classrunner.class) @ContextConfiguration (locations = "ClassPath: spring.xml") classe pública heloserviceTest {@AUTOWIRED PRIVADO RPCPROXY RPCPROXY; @Test public void hellotest () {Helloservice helloservice = rpcProxy.create (helloservice.class); Resultado da string = helloservice.hello ("mundo"); Assert.asserTequals ("Hello! World", resultado); }}Execute os testes de unidade acima e, se nada inesperado acontecer, você verá a barra verde.
Resumir
Este artigo implementa uma estrutura RPC leve através da Spring + Netty + Protostuff + Zookeeper. Ele usa o Spring para fornecer injeção de dependência e configuração de parâmetros, usa a Netty para implementar a transmissão de dados do NIO, usa o ProtoSFUFF para implementar a serialização do objeto e usa o Zookeeper para implementar o registro e a descoberta de serviço. Usando essa estrutura, os serviços podem ser implantados em qualquer nó em um ambiente distribuído. O cliente chama a implementação específica do servidor por meio de uma interface remota, separando completamente o desenvolvimento do servidor e do cliente, fornecendo suporte básico para a implementação de aplicativos distribuídos em larga escala.
Apêndice: Dependência do Maven
<!-Junit-> <Depencency> <voundiD> Junit </proupId> <TRARFACTID> JUNIT </STIFACTID> <Versão> 4.11 </siers> <cope> teste </scope> </dependency> <!-slf4j-> <pendency> <puperid> org.slf4j </grupidid> artiftId> slftactid> slftactId-> <pendency> <voupid> org.slf4j </grupId> ArtiftId> <version>1.7.7</version></dependency><!-- Spring --><dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.2.12.RELEASE</version></dependency><dependency> <groupId>org.springframework</groupId> <TeRtifactId> Spring-test </stifactId> <versão> 3.2.12.Release </sisters> <cope> Test </scope> </dependency> <!-netty-> <pendency> <purifactid> <setty> </foupid> <TarifactId> netty-all </stifactId> <ugerty >.20 versão </groupid> <TorkActid> </Artifactid> <erigns> </setty> </roupid> <stifactId> netty-all </stifactId> <ermENC> </siperty >.20s (! -> <Depencency> <voundId> com.dyuproject.protoStuff </groupiD> <TRAFACTID> Protostuff-core </ArtifactId> <Versão> 1.0.8 </versão> </dependency> <!-zookeeper-> <pendency> <purid> org.apache.zookeeper </Grupoid> <version>3.4.6</version></dependency><!-- Apache Commons Collections --><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.0</version></dependency><!-- Objenesis --><dependency> <groupId>org.objenesis</groupId> <TarfactId> objenesis </stutifactId> <versão> 2.1 </version> </dependency> <!-cglib-> <pendency> <puperid> cglib </groupid> <sutifactId> cglib </artifactId> <versão> 3.1 </versão </pendence>