RPC или удаленный вызов процедуры - это просто выразить это просто: вызов сервисов на удаленных компьютерах - это все равно, что вызов локальных услуг.
RPC может быть основан на HTTP или протоколе TCP. Веб -сервис - это RPC на основе протокола HTTP. Он обладает хорошей кроссплатформенной производительностью, но его производительность не так хороша, как RPC на основе протокола TCP. Два аспекта будут напрямую влиять на производительность RPC, один из них является методом передачи, а другой - сериализация.
Как мы все знаем, TCP является протоколом транспортного уровня, HTTP является протоколом прикладного уровня, а транспортный уровень больше находится под прикладным уровнем. С точки зрения передачи данных, нижний слой быстрее. Следовательно, в целом, TCP должен быть быстрее, чем http. Что касается сериализации, Java предоставляет метод сериализации по умолчанию, но в случае высокой параллелистики этот метод принесет некоторые узкие места производительности, поэтому на рынке появилась серия превосходных структур сериализации, таких как: Protobuf, Kryo, Hessian, Jackson и т. Д. Они могут заменить сериализацию Java, чтобы обеспечить более эффективную производительность.
Для поддержки высокой параллели, традиционная блокировка IO, очевидно, не подходит, поэтому нам нужен асинхронный IO, то есть Nio. Java предоставляет Nio Solutions, а Java 7 также обеспечивает лучшую поддержку NIO.2. Реализация NIO с Java не является далекой, но мы должны быть знакомы с техническими деталями NIO.
Нам необходимо развернуть услуги на разных узлах в распределенной среде, и благодаря регистрации услуг клиент может автоматически обнаружить доступные в настоящее время услуги и позвонить в эти услуги. Это требует компонента реестра услуг для регистрации всех адресов обслуживания (включая: имя хоста и номер порта) в распределенной среде.
Взаимосвязь между регистрацией приложений, обслуживания и обслуживания показана на рисунке ниже:
Несколько сервисов могут быть опубликованы на каждом сервере. Эти услуги делятся хостом и портом. В распределенной среде будет предоставлен сервер для совместного предоставления услуг. Кроме того, чтобы предотвратить единую точку отказа от реестра обслуживания, он должен быть встроен в кластерную среду.
Эта статья покажет конкретный процесс разработки легкой распределенной RPC -структуры. Эта структура основана на протоколе TCP, обеспечивает функции NIO, обеспечивает эффективные методы сериализации, а также имеет возможность регистрировать и обнаруживать услуги.
Согласно вышеуказанным техническим требованиям, мы можем использовать следующий выбор технологии:
Для связанных зависимостей Maven, пожалуйста, смотрите последнее приложение.
Шаг 1: Напишите интерфейс службы
публичный интерфейс helloService {string hello (string name);}Поместите этот интерфейс в автономный клиент -пакет для использования.
Шаг 2: Напишите класс реализации интерфейса службы
@Rpcservice (helloservice.class) // Укажите удаленный интерфейс открытый класс HelloServiceimpl реализует HelloService {@Override public String Hello (String name) {return "Hello!" + имя; }}Используйте аннотацию RPCService, чтобы определить класс реализации интерфейса сервиса. Вам необходимо указать удаленный интерфейс для класса реализации, потому что класс реализации может реализовать несколько интерфейсов, поэтому вы должны сообщить структуру, которая является удаленным интерфейсом.
Код RPCService выглядит следующим образом:
@Target ({elementType.type})@horesting(RetentionPolicy.runtime)@component // Указывает, что его можно отсканировать с помощью Spring Public @Interface RPCService {class <?> Value ();}Эта аннотация имеет характеристики компонентной аннотации весны и может быть отсканирована к весне.
Этот класс реализации размещен в пакете JAR Server, который также предоставляет некоторые файлы конфигурации сервера и программы начальной загрузки для запуска службы.
Шаг 3: Настройте сервер
Файл конфигурации пружины сервера называется Spring.xml, а содержимое следующим образом:
<Beans ...> <context: component-scan base-package = "com.xxx.rpc.sample.server"/> <Контекст: Property Placeholder location = "classpath: config.properties"/> <!-Configure Service Registration Component-> <Bean Id = "ServiceRegistry"> <constructor-arg negement = " value="${registry.address}"/> </bean> <!-- Configure RPC server--> <bean id="rpcServer"> <constructor-arg name="serverAddress" value="${server.address}"/> <constructor-arg name="serviceRegistry" ref="serviceRegistry"/> </bean></beans>Конкретные параметры конфигурации находятся в файле config.properties, а содержимое следующее:
# Zookeeper Server Registry.address = 127.0.0.1: 2181# RPC Server Server.Address = 127.0.0.1: 8000
Приведенная выше конфигурация указывает, что локальный сервер Zookeeper подключен, а служба RPC выпускается на порту 8000.
Шаг 4: Запустите сервер и опубликуйте службу
Чтобы загрузить файлы конфигурации пружины, чтобы опубликовать службу, просто напишите загрузчик:
открытый класс rpcbootstrap {public static void main (string [] args) {new classpathxmlapplicationContext ("spring.xml"); }}Запустите основной метод класса RPCBootStrap для запуска сервера, но есть два важных компонента, которые еще не были реализованы, а именно: ServiceRegistry и RPCServer. Конкретные детали реализации будут приведены ниже.
Шаг 5: Реализация регистрации услуг
Функция регистрации услуг может быть легко реализована с использованием клиента Zookeeper. Код ServiceRegistry заключается в следующем:
Общедоступный класс ServiceRegistry {Private Static Final Logger Logger = loggerFactory.getLogger (ServicERegistry.class); Private CountDownLatch Latch = новый CountDownLatch (1); частные строки RegistryAddress; Public ServiceRegistry (String RegistryAddress) {this.registryAddress = RegistryAddress; } public void Register (String Data) {if (data! = null) {zookeeper zk = contepserver (); if (zk! = null) {createNode (zk, data); }}} private Zookeeper connectServer () {Zookeeper zk = null; try {zk = new Zookeeper (RegistryAddress, constant.zk_session_timeout, new Watcher () {@Override public void Process (WatchedEvent Event) {if (event.getState () == Event.PokeStrestate.syncConnected) {latch.countdown ();}}}}}); latch.await (); } catch (ioException | прерывание Exception e) {logger.error ("", e); } return zk; } private void createNode (Zookeeper zk, String Data) {try {byte [] bytes = data.getbytes (); String path = zk.create (constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sectient); Logger.debug ("create Zookeeper Node ({} => {})", path, data); } catch (KeeperException | прерывание Exception e) {logger.error ("", e); }}}Среди них все константы настроены через постоянную:
Постоянная публичная интерфейс {int zk_session_timeout = 5000; String zk_registry_path = "/Registry"; String zk_data_path = zk_registry_path + "/data";}Примечание. Во -первых, вам необходимо использовать командную строку клиента Zookeeper для создания/реестра постоянных узлов для хранения всех временных узлов обслуживания.
Шаг 6: Реализация сервера RPC
Использование Netty может реализовать сервер RPC, который поддерживает NIO. Вам необходимо использовать ServiceRegistry для регистрации адреса службы. Код RPCServer заключается в следующем:
открытый класс RPCServer реализует ApplicationContextAware, инициализация bean {private Static Final Logger logger = loggerFactory.getLogger (rpcserver.class); Private String Serverdress; Частная услуга ServiceRegistry ServiceRegistry; частная карта <string, object> handlermap = new HashMap <> (); // Храните соотношение отображения между именем интерфейса и объектом службы public rpcserver (String ServerDress) {this.serveraddress = serveraddress; } public rpcServer (String ServerDress, ServiceRegistry ServiceRegistry) {this.serveraddress = serveraddress; this.serviceRegistry = ServiceRegistry; } @Override public void setApplicationContext (ApplicationContext ctx) Throws BeanSexception {map <string, object> serviceBeanMap = ctx.getBeanSwithannotation (rpcservice.class); // Получить все пружины с аннотациями rpcService Bean if (maputils.isnotempty (servicebeanmap)) {for (Object Servicebean: servicebeanmap.values ()) {String interfacename = servicebean.getClass (). Getannotation (rpcservice.class). handlermap.put (InterfaceName, ServiceBean); }}} @Override public void AfterProperTiesset () throws exception {eventloopgroup bossgroup = new nioeventloopgroup (); EventLoopGroup WorkerGroup = new nioeventloopgroup (); try {serverbootstrap bootstrap = new serverbootstrap (); Bootstrap.Group (BossGroup, WorkerGroup) .Channel (nioServerSocketchannel.class) .ChildHandler (новый канал ChannelInitializer <socketchannel> () {@Override public void initchannel (Socketchannel Channel) throws execure {cannel.pipeline () .addlast (new RpcDecoder (rpcreSt.cleen (Для обработки запроса) .Addlast (новый rpcencoder (rpcresponse.class)) // кодировать ответ RPC (чтобы вернуть ответ) .Addlast (новый rpchandler (handlermap)); String [] array = serveraddress.split (":"); String host = array [0]; int port = integer.parseint (массив [1]); Channelfuture future = bootstrap.bind (хост, порт) .sync (); Logger.debug ("Сервер запускается на порту {}", порт); if (serviceRegistry! = null) {serviceRegistry.register (serveraddress); // Адрес службы регистрации} future.Channel (). CloseFuture (). Sync (); } наконец {korkergroup.shutdowngracebount (); bossgroup.shutdowngracebout (); }}}В приведенном выше коде есть два важных POJO, которые необходимо описать, а именно RPCrequest и RPCresponse.
Используйте rpcrequest для инкапсуляции запросов RPC, код заключается в следующем:
открытый класс rpcrequest {private String requestId; Private String ClassName; Private String Methodname; Частный класс <?> [] parametertypes; частный объект [] параметры; // getter/setter ...}Используйте RPCresponse для инкапсуляции ответа RPC, код заключается в следующем:
открытый класс rpcresponse {private String requestId; Приватная бросаемая ошибка; Результат частного объекта; // getter/setter ...}Используйте RPCDecoder для предоставления декодирования RPC, просто расширяйте метод абстрактного декодирования Netty's BytetomesseDecoder, код выглядит следующим образом:
Public Class RPCDecoder Extends BytetomessedEcoder {private Class <?> GenericClass; public rpcdecoder (class <?> GenericClass) {this.genericClass = genericClass; } @Override public void decode (ChannelHandLercOntext ctx, bytebuf in, list <object> out) throws exception {if (in.readablebytes () <4) {return; } in.markreaderIndex (); int dataLength = in.readint (); if (dataLength <0) {ctx.close (); } if (in.readablebytes () <) <datalength) {in.resetReaderIndex (); возвращаться; } byte [] data = new Byte [dataLength]; in.readbytes (data); Объект obj = serializationUtil.deserialize (data, genericclass); out.add (obj); }}Используйте rpcenceder для обеспечения кодирования RPC, просто расширяйте метод Abstract Class Class MessageTobyteEcoder, код выглядит следующим образом:
Public Class RpcEncoder Extens MessageTobyteencoder {Private Class <?> GenericClass; public rpcencoder (class <?> GenericClass) {this.genericClass = genericClass; } @Override public void Encode (ChannelHandLercOntext CTX, Object in, bytebuf Out) Throws Exception {if (genericClass.isInstance (in)) {byte [] data = serializationutil.serialize (in); out.writeint (data.length); out.writebytes (data); }}}Напишите класс инструмента SerializationUtil и используйте Protostuff для реализации сериализации:
public class serializationutil {частная статическая карта <class <?>, schema <? >> cachedschema = new concurrenthashmap <> (); Частный статический objenesis objenesis = new objenesisstd (true); Private SerializationUtil () {} @SuppressWarnings ("unchecked") private static <t> schema <t> getchema (class <t> cls) {schema <t> schema = (schema <t>) cachedschema.get (cls); if (schema == null) {schema = runtimeschema.createfrom (cls); if (schema! = null) {cachedschema.put (cls, schema); }} return схема; } @Suppresswarnings ("unchecked") public static <t> byte [] serialize (t obj) {class <t> cls = (class <t>) obj.getClass (); LinkedBuffer Buffer = linkedBuffer.allocate (linkedBuffer.default_buffer_size); try {schema <t> schema = getchema (cls); return protostuffioutil.tobytearray (obj, схема, буфер); } catch (Exception e) {бросить new allogalstateException (e.getMessage (), e); } наконец {buffer.clear (); }} public static <t> t deserialize (byte [] data, class <t> cls) {try {t message = (t) objenesis.newinstance (cls); Schema <t> schema = getchema (CLS); Protostuffioutil.mergefrom (данные, сообщение, схема); вернуть сообщение; } catch (Exception e) {бросить new allogalstateException (e.getMessage (), e); }}}Приведенное выше используется objenesis для создания объектов, что является более мощным, чем отражение Java.
Примечание. Если вам нужно заменить другие структуры сериализации, просто измените сериализацию. Конечно, лучшим способом его реализации является предоставление элементов конфигурации, чтобы решить, какой метод сериализации использовать.
Чтобы обрабатывать запросы RPC в RPChandler, вам просто нужно расширить арестный класс Netty's SimpleChannelInboundHandler, код выглядит следующим образом:
Public Class Rpchandler Extends SimpleChannelinBoundHandler <rpcrequest> {Private Static Final Logger = loggerFactory.getLogger (rpchandler.class); Приватная окончательная карта <строка, объект> handlermap; public rpchandler (map <string, object> handlermap) {this.handlermap = handlermap; } @Override public void ChannelRead0 (Final ChannelHandLercontext CTX, RPCRequest запрос) Throws Exception {rpCresponse response = new rpcresponse (); response.setRequestid (request.getRequestid ()); try {object result = handle (request); response.setResult (результат); } catch (throwable t) {response.setError (t); } ctx.WriteAndflush (ответ) .AddListener (ChannelfutureListener.Close); } gative Object Handle (rpcRequest запрос) бросает throwable {String classname = request.getClassname (); Object ServiceBean = handlermap.get (classname); Class <?> ServiceClass = ServiceBean.getClass (); String methodname = request.getMethodName (); Class <?> [] ParameterTypes = request.getParameterTypes (); Object [] parameters = request.getParameters (); /*Метод метода = serviceClass.getMethod (MethodName, ParameterTypes); method.setAccessible (true); method return.invoke (servicebean, parameters);*/ fastclass serviceStclass = fastClass.create (ServiceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod (MethodName, ParameterTypes); return servicefastmethod.invoke (ServiceBean, параметры); } @Override public void exceptionCaught (ChannelHandlerContext CTX, Diskable Canie) {logger.Error ("Исключение по уловке сервера", причина); ctx.close (); }}Чтобы избежать проблем с производительностью, вызванных использованием Java Reflection, мы можем использовать API отражения, предоставленный CGLIB, такой как FastClass и FastMethod, используемый выше.
Шаг 7: Настройте клиент
Также используйте файлы конфигурации пружины для настройки клиента RPC. Код Spring.xml выглядит следующим образом:
<Beans ...> <Контекст: Property Placeholder location = "classpath: config.properties"/> <!-Configure Service Discovery Component-> <Bean Id = "ServiceSiceScovery"> <Constructor-Arg name = "RegistryAddress" value = "$ {registry.address}"/> </bean> <! id = "rpcproxy"> <constructor-arg name = "обслуживаниеconfig.properties предоставляет конкретную конфигурацию:
# Zookeeper Server Registry.address = 127.0.0.1: 2181
Шаг 8: Реализация обнаружения услуг
Также используйте Zookeeper для реализации функции обнаружения услуг, см. Следующий код:
Общедоступный класс обслуживает Private CountDownLatch Latch = новый CountDownLatch (1); Частный летучий список <string> datalist = new ArrayList <> (); частные строки RegistryAddress; Public ServiceScovery (String RegistryAddress) {this.registryAddress = RegistryAddress; Zookeeper zk = connectServer (); if (zk! = null) {watchnode (zk); }} public String Discover () {String Data = null; int size = datalist.size (); if (size> 0) {if (size == 1) {data = datalist.get (0); Logger.debug («Использование только данных: {}», data); } else {data = datalist.get (threadlocalrandom.current (). nextint (size)); Logger.debug («Использование случайных данных: {}», data); }} вернуть данные; } private Zookeeper connectServer () {Zookeeper zk = null; try {zk = new Zookeeper (RegistryAddress, constant.zk_session_timeout, new Watcher () {@Override public void Process (WatchEdevent Event) {if (event.getState () == Event.PokeState.syncConnected) {latch.countdown ();}}); latch.await (); } catch (ioException | прерывание Exception e) {logger.error ("", e); } return zk; } private void watchnode (final Zookeeper zk) {try {list <string> nodelist = zk.getchildren (constant.zk_registry_path, new Watcher () {@override public void Process (watchedevent event) {if (event.gettype () == event.eventtype.nodeChild) {if (event.gettype () == event.eventtype.nodeChild) }}); Список <string> datalist = new ArrayList <> (); for (String node: Nodelist) {byte [] bytes = zk.getData (constant.zk_registry_path + "/" + node, false, null); datalist.add (new String (байты)); } Logger.debug ("Data Node: {}", Datalist); this.datalist = datalist; } catch (KeeperException | прерывание Exception e) {logger.error ("", e); }}}Шаг 9: Реализация агента RPC
Здесь мы используем динамическую прокси -технологию, предоставленную Java для реализации RPC -прокси (конечно, ее также можно реализовать с помощью CGLIB). Конкретный код заключается в следующем:
открытый класс rpcProxy {private String Serverdress; Частное обслуживание в обслуживании; public rpcProxy (String Serverdress) {this.ServerAddress = serverAddress; } public rpcProxy (обслуживание обслуживания) {this.serviceDiscovery = обслуживание; } @Suppresswarnings ("unchecked") public <t> t create (class <?> Interfaceclass) {return (t) proxy.newproxyinstance (terfacecclass.getclassloader (), новый класс <?> {Methodsy), Methody objecty, объект argeSy, объект argeSy, объект argeSy, объект. Throws Throwable {rpcRequest запрос = new RpcRequest (); QUERSE.SetParameterTypes (Method.getParametertypes ()); Integer.parseint (Array [1]); } else {return response.getResult (); }}}); }}Чтобы реализовать клиент RPC с использованием класса RPCClient, вам нужно только расширить абстрактный класс SimpleChannelInboundhandler, предоставленного Netty, код выглядит следующим образом:
Public Class RPCClient расширяет SimpleChannelinBoundHandler <rpcresponse> {private Static Final Logger = loggerFactory.getLogger (rpcClient.class); частный хост строки; частный порт int; частный ответ RPCresponse; Частный конечный объект obj = new Object (); public rpcClient (String Host, int port) {this.host = host; this.port = порт; } @Override public void ChannelRead0 (ChannelHandlercontext CTX, ответ RPCresponse) бросает исключение {this.Response = response; синхронизированный (obj) {obj.notifyall (); // Получить ответ, разбудить поток}} @Override public void ExceptionCaught (ChannelHandlercontext CTX, Diseable Cause) Throws Exception {logger.Error («Exception клиент Exception», Canies); ctx.close (); } public rpCresponse send (rpcRequest -запрос) выбрасывает Exception {eventloopgroup Group = new nioeventloopgroup (); try {bootstrap bootstrap = new Bootstrap (); bootstrap.group (группа) .Channel (niosocketchannel.class) .handler (новый Channelinitializer <socketchannel> () {@override public void initchannel (канал Socketchannel) Throws Exception {channel.pipeline () .Addlast (new RpCencoder (rpcReest.class.class)). Rpcdecoder (rpcresponse.class)) // Декодировать ответ RPC (для обработки ответа) .Addlast (rpcClient.This); Channelfuture future = bootstrap.connect (host, port) .sync (); Future.Channel (). WriteAndflush (запрос) .sync (); синхронизированный (obj) {obj.wait (); // Ответ не был получен, в результате чего поток ожидал} if (response! = Null) {future.channel (). CloseFuture (). Sync (); } return response; } наконец {group.shutdowngracebount (); }}}Шаг 10: Отправить запрос RPC
Используйте Junit, чтобы написать модульный тест в сочетании с Spring, со следующим кодом:
@Runwith (springjunit4classrunner.class) @contextconfiguration (locations = "classpath: spring.xml") public class helloservicetest {@autowired private rpcproxy rpcproxy; @Test public void helloTest () {helloService helloService = rpcproxy.create (helloservice.class); String result = helloService.Hello ("World"); Assert.assertequals ("Привет! Мир", результат); }}Запустите вышеуказанные модульные тесты, и, если ничего неожиданного не происходит, вы должны увидеть зеленый бар.
Суммировать
Эта статья реализует легкую структуру RPC через Spring + Netty + Protostuff + Zookeeper. Он использует Spring для обеспечения инъекции зависимости и конфигурации параметров, использует Netty для реализации передачи данных NiO, использует Protostuff для реализации сериализации объектов и использует Zookeeper для реализации регистрации и обнаружения службы. Используя эту структуру, услуги могут быть развернуты на любом узле в распределенной среде. Клиент вызывает конкретную реализацию сервера через удаленный интерфейс, полностью разделяя разработку сервера и клиента, обеспечивая базовую поддержку для реализации крупномасштабных распределенных приложений.
Приложение: зависимость Maven
<!-junit-> <dependency> <groupid> junit </groupid> <artifactid> junit </artifactid> <serse> 4.11 </version> <cerpope> test </scope> </depertive> <!-slf4j-> <deygepater> <groupid> org.slf4j </artifactid> slf4jjid> <//artifactid> slf4jjid <//artifactid> slf4jjid <//artifactid> slf4jjid> <версия> 1.7.7 </version> </vehyederiny> <!-Spring-> <Dependency> <groupId> org.springframework </GroupId> <strifactid> Spring-context </artifactid> <sersive> 3.2.12.Release </version> </resemendion> <DegeTifId> org.springframework </version> </restifact> <groupeD> ortifact> <serse> 3.2.12.release </version> <cerpope> test </scope> </degency> <!-netty-> <dependency> <groupid> io.netty </groupid> <artifactid> netty-all </artifactid> <serse> 4.0.24.final </version> </redy> <! <groupId> com.dyuproject.protostuff </GroupId> <StrifactId> protoStuff-core </artifactid> <sersive> 1.0.8 </version> </depostuff> <!-Zookeeper-> <Dependency> <groupId> org.apache.zookeeper </GroupeEper-> <Depective> <serse> 3.4.6 </version> </vehyse> <!-Apache Commons Collections-> <Dependency> <groupId> org.apache.commons </GroupId> <strifactid> commons-collections4 </artifactid> <sersive> 4.0 </version> </jeployse> <!-objenesis-> <bervice> <groupd> org.objeses> <! <artifactid> objenesis </artifactid> <sersive> 2.1 </version> </depertive> <!-cglib-> <DepeCted> <groupid> cglib </GroupId> <artifactid> cglib </artifactid> <serse> 3.1 </version> </vehing>