RPC, or Remote Procedure Call, is just to put it simply: calling services on remote computers is just like calling local services.
RPC can be based on HTTP or TCP protocol. Web Service is an RPC based on HTTP protocol. It has good cross-platform performance, but its performance is not as good as RPC based on TCP protocol. Two aspects will directly affect the performance of RPC, one is the transmission method, and the other is serialization.
As we all know, TCP is the transport layer protocol, HTTP is the application layer protocol, and the transport layer is more under the application layer. In terms of data transmission, the lower layer is faster. Therefore, in general, TCP must be faster than HTTP. As for serialization, Java provides the default serialization method, but in the case of high concurrency, this method will bring some performance bottlenecks, so a series of excellent serialization frameworks have emerged on the market, such as: Protobuf, Kryo, Hessian, Jackson, etc. They can replace Java's default serialization to provide more efficient performance.
To support high concurrency, traditional blocking IO is obviously not suitable, so we need asynchronous IO, i.e. NIO. Java provides NIO solutions, and Java 7 also provides better NIO.2 support. Implementing NIO with Java is not a distant thing, but we need to be familiar with the technical details of NIO.
We need to deploy services on different nodes in a distributed environment, and through service registration, the client can automatically discover the currently available services and call these services. This requires a service registry component to register all service addresses (including: hostname and port number) in a distributed environment.
The relationship between the application, service, and service registry is shown in the figure below:
Multiple Services can be published on each Server. These Services share a host and port. In a distributed environment, Server will be provided to jointly provide Services. In addition, to prevent single point of failure of Service Registry, it needs to be built into a cluster environment.
This article will reveal the specific process of developing a lightweight distributed RPC framework. This framework is based on the TCP protocol, provides NIO features, provides efficient serialization methods, and also has the ability to register and discover services.
According to the above technical requirements, we can use the following technology selection:
For related Maven dependencies, please see the last appendix.
Step 1: Write a service interface
public interface HelloService { String hello(String name);}Place this interface in a standalone client jar package for use.
Step 2: Write the implementation class of the service interface
@RpcService(HelloService.class) // Specify the remote interface public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { return "Hello!" + name; }}Use the RpcService annotation to define the implementation class of the service interface. You need to specify a remote interface for the implementation class, because the implementation class may implement multiple interfaces, so you must tell the framework which is the remote interface.
The RpcService code is as follows:
@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Component // Indicates that it can be scanned by Spring public @interface RpcService { Class<?> value();}This annotation has the characteristics of Spring's Component annotation and can be scanned by Spring.
This implementation class is placed in the server jar package, which also provides some server configuration files and bootstrap programs for starting the service.
Step 3: Configure the server
The server Spring configuration file is named spring.xml, and the content is as follows:
<beans ...> <context:component-scan base-package="com.xxx.rpc.sample.server"/> <context:property-placeholder location="classpath:config.properties"/> <!-- Configure service registration component--> <bean id="serviceRegistry"> <constructor-arg name="registryAddress" value="${registry.address}"/> </bean> <!-- Configure RPC server--> <bean id="rpcServer"> <constructor-arg name="serverAddress" value="${server.address}"/> <constructor-arg name="serviceRegistry" ref="serviceRegistry"/> </bean></beans>The specific configuration parameters are in the config.properties file, and the content is as follows:
# ZooKeeper server registry.address=127.0.0.1:2181# RPC server server.address=127.0.0.1:8000
The above configuration indicates that the local ZooKeeper server is connected and the RPC service is released on port 8000.
Step 4: Start the server and publish the service
To load Spring configuration files to publish a service, just write a bootloader:
public class RpcBootstrap { public static void main(String[] args) { new ClassPathXmlApplicationContext("spring.xml"); }}Run the main method of the RpcBootstrap class to start the server, but there are two important components that have not been implemented yet, namely: ServiceRegistry and RpcServer. The specific implementation details will be given below.
Step 5: Implement service registration
The service registration function can be easily implemented using the ZooKeeper client. The ServiceRegistry code is as follows:
public class ServiceRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class); private CountDownLatch latch = new CountDownLatch(1); private String registryAddress; public ServiceRegistry(String registryAddress) { this.registryAddress = registryAddress; } public void register(String data) { if (data != null) { ZooKeeper zk = connectServer(); if (zk != null) { createNode(zk, data); } } } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } } }); latch.await(); } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } private void createNode(ZooKeeper zk, String data) { try { byte[] bytes = data.getBytes(); String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LOGGER.debug("create zookeeper node ({} => {})", path, data); } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } }}Among them, all constants are configured through Constant:
public interface Constant { int ZK_SESSION_TIMEOUT = 5000; String ZK_REGISTRY_PATH = "/registry"; String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";}Note: First, you need to use the ZooKeeper client command line to create/registry permanent nodes to store all temporary nodes of service.
Step 6: Implement the RPC server
Using Netty can implement an RPC server that supports NIO. You need to use ServiceRegistry to register the service address. The RpcServer code is as follows:
public class RpcServer implements ApplicationContextAware, InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class); private String serverAddress; private ServiceRegistry serviceRegistry; private Map<String, Object> handlerMap = new HashMap<>(); // Store the mapping relationship between the interface name and the service object public RpcServer(String serverAddress) { this.serverAddress = serverAddress; } public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) { this.serverAddress = serverAddress; this.serviceRegistry = serviceRegistry; } @Override public void setApplicationContext(ApplicationContext ctx) throws BeansException { Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // Get all Springs with RpcService annotations Bean if (MapUtils.isNotEmpty(serviceBeanMap)) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName(); handlerMap.put(interfaceName, serviceBean); } } } @Override public void afterPropertiesSet() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new RpcDecoder(RpcRequest.class)) // Decode the RPC request (to handle the request) .addLast(new RpcEncoder(RpcResponse.class)) // Encode the RPC response (to return the response) .addLast(new RpcHandler(handlerMap)); // Handle RPC request} }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); ChannelFuture future = bootstrap.bind(host, port).sync(); LOGGER.debug("server started on port {}", port); if (serviceRegistry != null) { serviceRegistry.register(serverAddress); // Register service address} future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}In the above code, there are two important POJOs that need to be described, namely RpcRequest and RpcResponse.
Use RpcRequest to encapsulate RPC requests, the code is as follows:
public class RpcRequest { private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; // getter/setter...}Use RpcResponse to encapsulate the RPC response, the code is as follows:
public class RpcResponse { private String requestId; private Throwable error; private Object result; // getter/setter...}Use RpcDecoder to provide RPC decoding, just extend Netty's ByteToMessageDecoder abstract class decode method, the code is as follows:
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserialize(data, genericClass); out.add(obj); }}Use RpcEncoder to provide RPC encoding, just extend Netty's MessageToByteEncoder abstract class encode method, the code is as follows:
public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } }}Write a SerializationUtil tool class and use Protostuff to implement serialization:
public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>(); private static Objenesis objenesis = new ObjenesisStd(true); private SerializationUtil() { } @SuppressWarnings("unchecked") private static <T> Schema<T> getSchema(Class<T> cls) { Schema<T> schema = (Schema<T>) cachedSchema.get(cls); if (schema == null) { schema = RuntimeSchema.createFrom(cls); if (schema != null) { cachedSchema.put(cls, schema); } } return schema; } @SuppressWarnings("unchecked") public static <T> byte[] serialize(T obj) { Class<T> cls = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } public static <T> T deserialize(byte[] data, Class<T> cls) { try { T message = (T) objenesis.newInstance(cls); Schema<T> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(data, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }}The above uses Objenesis to instantiate objects, which is more powerful than Java reflection.
Note: If you need to replace other serialization frameworks, just modify the SerializationUtil. Of course, a better way to implement it is to provide configuration items to decide which serialization method to use.
To handle RPC requests in RpcHandler, you just need to extend Netty's SimpleChannelInboundHandler abstract class, the code is as follows:
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class); private final Map<String, Object> handlerMap; public RpcHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); /*Method method = serviceClass.getMethod(methodName, parameterTypes); method.setAccessible(true); return method.invoke(serviceBean, parameters);*/ FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOGGER.error("server catch exception", cause); ctx.close(); }}In order to avoid the performance problems caused by using Java reflection, we can use the reflection API provided by CGLib, such as FastClass and FastMethod used above.
Step 7: Configure the client
Also use Spring configuration files to configure the RPC client. The spring.xml code is as follows:
<beans ...> <context:property-placeholder location="classpath:config.properties"/> <!-- Configure service discovery component --> <bean id="serviceDiscovery"> <constructor-arg name="registryAddress" value="${registry.address}"/> </bean> <!-- Configure RPC proxy --> <bean id="rpcProxy"> <constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/> </bean></beans>config.properties provides specific configuration:
# ZooKeeper server registry.address=127.0.0.1:2181
Step 8: Implement service discovery
Also use ZooKeeper to implement service discovery function, see the following code:
public class ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); private CountDownLatch latch = new CountDownLatch(1); private volatile List<String> dataList = new ArrayList<>(); private String registryAddress; public ServiceDiscovery(String registryAddress) { this.registryAddress = registryAddress; ZooKeeper zk = connectServer(); if (zk != null) { watchNode(zk); } } public String discover() { String data = null; int size = dataList.size(); if (size > 0) { if (size == 1) { data = dataList.get(0); LOGGER.debug("using only data: {}", data); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); LOGGER.debug("using random data: {}", data); } } return data; } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (IOException | InterruptedException e) { LOGGER.error("", e); } return zk; } private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); } } } }); List<String> dataList = new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } LOGGER.debug("node data: {}", dataList); this.dataList = dataList; } catch (KeeperException | InterruptedException e) { LOGGER.error("", e); } }}Step 9: Implementing RPC Agent
Here we use the dynamic proxy technology provided by Java to implement RPC proxy (of course, it can also be implemented using CGLib). The specific code is as follows:
public class RpcProxy { private String serverAddress; private ServiceDiscovery serviceDiscovery; public RpcProxy(String serverAddress) { this.serverAddress = serverAddress; } public RpcProxy(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } @SuppressWarnings("unchecked") public <T> T create(Class<?> interfaceClass) { return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); // Create and initialize the RPC request request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); if (serviceDiscovery != null) { serverAddress = serviceDiscovery.discover(); // Discovery Service} String[] array = serverAddress.split(":"); String host = array[0]; int port = Integer.parseInt(array[1]); RpcClient client = new RpcClient(host, port); // Initialize the RPC client RpcResponse response = client.send(request); // Send an RPC request through the RPC client and get the RPC response if (response.isError()) { throw response.getError(); } else { return response.getResult(); } } } ); }}To implement the RPC client using the RpcClient class, you only need to extend the SimpleChannelInboundHandler abstract class provided by Netty, the code is as follows:
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class); private String host; private int port; private RpcResponse response; private final Object obj = new Object(); public RpcClient(String host, int port) { this.host = host; this.port = port; } @Override public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { this.response = response; synchronized (obj) { obj.notifyAll(); // Receive a response, wake up the thread} } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.error("client catch exception", cause); ctx.close(); } public RpcResponse send(RpcRequest request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new RpcEncoder(RpcRequest.class)) // Encode the RPC request (to send the request) .addLast(new RpcDecoder(RpcResponse.class)) // Decode the RPC response (to handle the response) .addLast(RpcClient.this); // Use RpcClient to send the RPC request} }) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().writeAndFlush(request).sync(); synchronized (obj) { obj.wait(); // No response was received, causing the thread to wait} if (response != null) { future.channel().closeFuture().sync(); } return response; } finally { group.shutdownGracefully(); } }}Step 10: Send RPC Request
Use JUnit to write a unit test in combination with Spring, with the following code:
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(locations = "classpath:spring.xml")public class HelloServiceTest { @Autowired private RpcProxy rpcProxy; @Test public void helloTest() { HelloService helloService = rpcProxy.create(HelloService.class); String result = helloService.hello("World"); Assert.assertEquals("Hello! World", result); }}Run the above unit tests and if nothing unexpected happens, you should see the green bar.
Summarize
This article implements a lightweight RPC framework through Spring + Netty + Protostuff + ZooKeeper. It uses Spring to provide dependency injection and parameter configuration, uses Netty to implement NIO data transmission, uses Protostuff to implement object serialization, and uses ZooKeeper to implement service registration and discovery. Using this framework, services can be deployed on any node in a distributed environment. The client calls the specific implementation of the server through a remote interface, completely separateing the development of the server and the client, providing basic support for the implementation of large-scale distributed applications.
Appendix: Maven Dependence
<!-- JUnit --><dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope></dependency><!-- SLF4J --><dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version></dependency><!-- Spring --><dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.2.12.RELEASE</version></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>3.2.12.RELEASE</version> <scope>test</scope></dependency><!-- Netty --><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.24.Final</version></dependency><!-- Protostuff --><dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.8</version></dependency><!-- ZooKeeper --><dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version></dependency><!-- Apache Commons Collections --><dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.0</version></dependency><!-- Objenesis --><dependency> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> <version>2.1</version></dependency><!-- CGLib --><dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>3.1</version></dependency>