RPC หรือการโทรขั้นตอนระยะไกลเป็นเพียงการใช้งานง่ายๆ: การโทรบริการบนคอมพิวเตอร์ระยะไกลก็เหมือนกับการโทรบริการท้องถิ่น
RPC สามารถขึ้นอยู่กับโปรโตคอล HTTP หรือ TCP บริการเว็บเป็น RPC ตามโปรโตคอล HTTP มันมีประสิทธิภาพข้ามแพลตฟอร์มที่ดี แต่ประสิทธิภาพของมันไม่ดีเท่า RPC ตามโปรโตคอล TCP สองด้านจะส่งผลโดยตรงต่อประสิทธิภาพของ RPC หนึ่งคือวิธีการส่งสัญญาณและอีกวิธีหนึ่งคือการทำให้เป็นอนุกรม
อย่างที่เราทราบกันดีว่า TCP เป็นโปรโตคอลการขนส่งเลเยอร์ HTTP เป็นโปรโตคอลเลเยอร์แอปพลิเคชันและชั้นการขนส่งอยู่ภายใต้เลเยอร์แอปพลิเคชันมากขึ้น ในแง่ของการส่งข้อมูลชั้นล่างจะเร็วขึ้น ดังนั้นโดยทั่วไป TCP จะต้องเร็วกว่า HTTP สำหรับการทำให้เป็นอนุกรม Java ให้วิธีการทำให้เป็นอนุกรมเริ่มต้น แต่ในกรณีที่มีการเกิดขึ้นพร้อมกันสูงวิธีนี้จะนำคอขวดประสิทธิภาพบางอย่างดังนั้นชุดของกรอบการทำให้เป็นอนุกรมที่ยอดเยี่ยมได้เกิดขึ้นในตลาดเช่น: Protobuf, Kryo, Hessian, Jackson ฯลฯ
เพื่อรองรับการทำงานร่วมกันที่สูงการปิดกั้นแบบดั้งเดิม IO นั้นไม่เหมาะสมดังนั้นเราจึงต้องการ IO แบบอะซิงโครนัสเช่น Nio Java ให้บริการโซลูชั่น NIO และ Java 7 ยังให้การสนับสนุน NIO.2 ที่ดีขึ้น การใช้ NIO ด้วย Java ไม่ใช่สิ่งที่ห่างไกล แต่เราจำเป็นต้องคุ้นเคยกับรายละเอียดทางเทคนิคของ NIO
เราจำเป็นต้องปรับใช้บริการบนโหนดที่แตกต่างกันในสภาพแวดล้อมแบบกระจายและผ่านการลงทะเบียนบริการลูกค้าสามารถค้นพบบริการที่มีอยู่ในปัจจุบันโดยอัตโนมัติและโทรหาบริการเหล่านี้ สิ่งนี้ต้องการส่วนประกอบรีจิสทรีบริการเพื่อลงทะเบียนที่อยู่บริการทั้งหมด (รวมถึง: ชื่อโฮสต์และหมายเลขพอร์ต) ในสภาพแวดล้อมแบบกระจาย
ความสัมพันธ์ระหว่างแอปพลิเคชันบริการและรีจิสทรีบริการแสดงในรูปด้านล่าง:
บริการหลายบริการสามารถเผยแพร่ในแต่ละเซิร์ฟเวอร์ บริการเหล่านี้แบ่งปันโฮสต์และพอร์ต ในสภาพแวดล้อมแบบกระจายเซิร์ฟเวอร์จะได้รับการจัดหาบริการร่วมกัน นอกจากนี้เพื่อป้องกันไม่ให้จุดเดียวของความล้มเหลวของรีจิสทรีบริการจำเป็นต้องสร้างขึ้นในสภาพแวดล้อมคลัสเตอร์
บทความนี้จะเปิดเผยกระบวนการเฉพาะของการพัฒนากรอบ RPC แบบกระจายน้ำหนักเบา เฟรมเวิร์กนี้ขึ้นอยู่กับโปรโตคอล TCP ให้คุณสมบัติ NIO ให้วิธีการทำให้เป็นอนุกรมที่มีประสิทธิภาพและยังมีความสามารถในการลงทะเบียนและค้นหาบริการ
ตามข้อกำหนดทางเทคนิคข้างต้นเราสามารถใช้การเลือกเทคโนโลยีต่อไปนี้:
สำหรับการพึ่งพา Maven ที่เกี่ยวข้องโปรดดูภาคผนวกสุดท้าย
ขั้นตอนที่ 1: เขียนอินเทอร์เฟซบริการ
อินเทอร์เฟซสาธารณะ helloService {สตริงสวัสดี (ชื่อสตริง);}วางอินเทอร์เฟซนี้ในแพ็คเกจ Jar ไคลเอนต์แบบสแตนด์อโลนเพื่อใช้งาน
ขั้นตอนที่ 2: เขียนคลาสการใช้งานของอินเทอร์เฟซบริการ
@RPCService (helloService.class) // ระบุอินเตอร์เฟสระยะไกลคลาสสาธารณะ HelloServiceImpl ใช้ HelloService {@Override Public String สวัสดี (ชื่อสตริง) {return "Hello!" + ชื่อ; -ใช้คำอธิบายประกอบ RPCService เพื่อกำหนดคลาสการใช้งานของอินเทอร์เฟซบริการ คุณต้องระบุอินเทอร์เฟซระยะไกลสำหรับคลาสการใช้งานเนื่องจากคลาสการใช้งานอาจใช้หลายอินเทอร์เฟซดังนั้นคุณต้องบอกเฟรมเวิร์กซึ่งเป็นอินเทอร์เฟซระยะไกล
รหัส RPCService มีดังนี้:
@target ({elementtype.type})@retention(retentionpolicy.runtime)@componentคำอธิบายประกอบนี้มีลักษณะของคำอธิบายประกอบส่วนประกอบของฤดูใบไม้ผลิและสามารถสแกนได้โดยฤดูใบไม้ผลิ
คลาสการใช้งานนี้ถูกวางไว้ในแพ็คเกจ Server Jar ซึ่งยังมีไฟล์การกำหนดค่าเซิร์ฟเวอร์และโปรแกรม bootstrap สำหรับการเริ่มต้นบริการ
ขั้นตอนที่ 3: กำหนดค่าเซิร์ฟเวอร์
ไฟล์การกำหนดค่าสปริงเซิร์ฟเวอร์มีชื่อว่า Spring.xml และเนื้อหามีดังนี้:
<Beans ... > <บริบท: Component-Scan base-package = "com.xxx.rpc.sample.server"/> <บริบท: สถานที่ตั้งของผู้ถือครองตำแหน่ง = "classpath: config.properties"/> <!-กำหนดค่าคอมโพเนนต์การลงทะเบียนบริการ-> <bean id = value = "$ {registry.address}"/> </ebean> <!-กำหนดค่าเซิร์ฟเวอร์ rpc-> <bean id = "rpcserver"> <constructor-arg name = "serveraddress" value = "$ {server.address}"/>พารามิเตอร์การกำหนดค่าเฉพาะอยู่ในไฟล์ config.properties และเนื้อหามีดังนี้:
# zookeeper server registry.address = 127.0.0.1: 2181# เซิร์ฟเวอร์เซิร์ฟเวอร์ RPC เซิร์ฟเวอร์ = 127.0.0.1: 8000
การกำหนดค่าข้างต้นบ่งชี้ว่าเซิร์ฟเวอร์ Zookeeper ท้องถิ่นเชื่อมต่อกันและบริการ RPC ถูกปล่อยออกมาบนพอร์ต 8000
ขั้นตอนที่ 4: เริ่มเซิร์ฟเวอร์และเผยแพร่บริการ
ในการโหลดไฟล์การกำหนดค่าสปริงเพื่อเผยแพร่บริการเพียงแค่เขียน bootloader:
คลาสสาธารณะ RPCBOOTSTRAP {โมฆะสาธารณะคงที่หลัก (สตริง [] args) {ใหม่ classPathxMlApplicationContext ("Spring.xml"); -เรียกใช้วิธีหลักของคลาส RPCBootstrap เพื่อเริ่มต้นเซิร์ฟเวอร์ แต่มีองค์ประกอบสำคัญสองประการที่ยังไม่ได้ใช้งานคือ: Serviceregistry และ RPCServer รายละเอียดการใช้งานเฉพาะจะได้รับด้านล่าง
ขั้นตอนที่ 5: ใช้การลงทะเบียนบริการ
ฟังก์ชั่นการลงทะเบียนบริการสามารถนำไปใช้ได้อย่างง่ายดายโดยใช้ไคลเอนต์ Zookeeper รหัส serviceregistry มีดังนี้:
serviceregistry ระดับสาธารณะ {ส่วนตัว logger สุดท้ายคงที่ logger = loggerFactory.getLogger (serviceregistry.class); private countdownlatch latch = new countdownlatch (1); registryaddress สตริงส่วนตัว; Serviceregistry สาธารณะ (String RegistryAddress) {this.registryAddress = RegistryAddress; } การลงทะเบียนโมฆะสาธารณะ (ข้อมูลสตริง) {if (data! = null) {zookeeper zk = ConnectServer (); if (zk! = null) {createnode (zk, data); }}} zookeeper ส่วนตัว ConnectServer () {zookeeper zk = null; ลอง {zk = new zookeeper (registryAddress, constant.zk_session_timeout, new Watcher () {@Override กระบวนการโมฆะสาธารณะ (เหตุการณ์ดู Event) {ถ้า (Event.getState () == Event.key.SyncConnected) {latch.countdown ();}}}}}}} latch.await (); } catch (ioexception | interruptedException e) {logger.error ("", e); } return zk; } โมฆะส่วนตัว createNode (Zookeeper ZK, ข้อมูลสตริง) {ลอง {byte [] bytes = data.getBytes (); String Path = zk.create (constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createMode.ephemeral_equential); logger.debug ("สร้าง zookeeper node ({} => {})", path, data); } catch (GeterException | interruptedException e) {logger.error ("", e); -ในหมู่พวกเขาค่าคงที่ทั้งหมดได้รับการกำหนดค่าผ่านค่าคงที่:
ค่าคงที่อินเตอร์เฟสสาธารณะ {int zk_session_timeout = 5000; String zk_registry_path = "/รีจิสทรี"; String zk_data_path = zk_registry_path + "/data";}หมายเหตุ: ก่อนอื่นคุณต้องใช้บรรทัดคำสั่งไคลเอนต์ Zookeeper เพื่อสร้าง/รีจิสทรีโหนดถาวรเพื่อจัดเก็บโหนดบริการชั่วคราวทั้งหมด
ขั้นตอนที่ 6: ใช้เซิร์ฟเวอร์ RPC
การใช้ Netty สามารถใช้เซิร์ฟเวอร์ RPC ที่รองรับ NIO คุณต้องใช้ serviceregistry เพื่อลงทะเบียนที่อยู่บริการ รหัส RPCServer มีดังนี้:
คลาสสาธารณะ RPCSERVER ใช้ ApplicationContextAware, InitializingBean {Logger Logger สุดท้ายคงที่ = LoggerFactory.getLogger (rpcserver.class); Serveraddress สตริงส่วนตัว; Serviceregistry ส่วนตัว; แผนที่ส่วนตัว <สตริงวัตถุ> handlermap = new hashmap <> (); // จัดเก็บความสัมพันธ์การแมประหว่างชื่ออินเตอร์เฟสและวัตถุบริการสาธารณะ RPCServer (String ServerAddress) {this.serverAddress = ServerAddress; } RPCServer สาธารณะ (String ServerAddress, ServicereGistry ServicereGistry) {this.serveraddress = ServerAddress; this.serviceregistry = serviceregistry; } @Override โมฆะสาธารณะ setApplicationContext (ApplicationContext CTX) พ่น beansexception {แผนที่ <สตริงวัตถุ> serviceBeanMap = ctx.getBeanswithannotation (rpcservice.class); // รับสปริงทั้งหมดด้วยคำอธิบายประกอบ RPCService Bean ถ้า (maputils.isnotEmpty (ServiceBeanMap)) {สำหรับ (Object ServiceBean: ServiceBeanMap.Values ()) {String Interfacename = ServiceBean.getClass (). getNannotation (rpcservice.class) handlermap.put (Interfacename, ServiceBean); }}} @Override โมฆะสาธารณะ AfterPropertIesset () พ่นข้อยกเว้น {EventLoopGroup BossGroup = ใหม่ nioEventLoopGroup (); EventLoopGroup WorkerGroup = ใหม่ nioEventLoopGroup (); ลอง {serverbootstrap bootstrap = new ServerBootStrap (); bootstrap.group (Bossgroup, WorkerGroup). Channel (nioserversocketChannel.class) .childhandler (ช่องใหม่ channelInitializer <OCKETCHANNEL> () {@Override โมฆะ public void initchannel (socketchannel channel) คำขอ RPC (เพื่อจัดการกับคำขอ) .addlast (ใหม่ rpcencoder (rpcresponse.class)) // เข้ารหัสการตอบสนอง RPC (เพื่อส่งคืนการตอบกลับ). addlast (ใหม่ rpchandler (handlermap); สตริง [] array = serveraddress.split (":"); สตริงโฮสต์ = อาร์เรย์ [0]; int port = integer.parseint (อาร์เรย์ [1]); Channelfuture Future = bootstrap.bind (โฮสต์, พอร์ต) .sync (); logger.debug ("เซิร์ฟเวอร์เริ่มต้นบนพอร์ต {}", พอร์ต); if (serviceregistry! = null) {serviceregistry.register (serveraddress); // ที่อยู่บริการลงทะเบียน} future.channel (). CloseFuture (). sync (); } ในที่สุด {workerGroup.shutdowngracefully (); bossgroup.shutdowngracefully (); -ในรหัสข้างต้นมีสอง pojos สำคัญที่ต้องอธิบายคือ rpcrequest และ rpcresponse
ใช้ rpcrequest เพื่อห่อหุ้มคำขอ RPC รหัสมีดังนี้:
คลาสสาธารณะ RPCRequest {Private String RequestId; คลาสสตริงส่วนตัว ชื่อสตริงส่วนตัว ชั้นเรียนส่วนตัว <?> [] พารามิเตอร์ therTypes; วัตถุส่วนตัว [] พารามิเตอร์; // getter/setter ... }ใช้ rpcresponse เพื่อห่อหุ้มการตอบสนอง RPC รหัสมีดังนี้:
คลาสสาธารณะ rpcresponse {private string requestId; ข้อผิดพลาดที่สามารถโยนได้ส่วนตัว ผลลัพธ์วัตถุส่วนตัว; // getter/setter ... }ใช้ RPCDecoder เพื่อให้การถอดรหัส RPC เพียงแค่ขยายวิธีการถอดรหัสคลาส Abstract Class ของ Netty ของ Netty โค้ดมีดังนี้:
RPCDecoder ระดับสาธารณะขยาย BytetOmessagedEcoder {คลาสส่วนตัว <?> GenericClass; สาธารณะ rpcdecoder (คลาส <?> genericClass) {this.genericClass = genericClass; } @Override โมฆะสาธารณะ Decode (channelhandlerContext ctx, byteBuf in, รายการ <jobch> ออก) โยนข้อยกเว้น {ถ้า (in.readableBytes () <4) {return; } in.markreaderIndex (); int datalength = in.readint (); if (datalength <0) {ctx.close (); } if (in.readableBytes () <datalength) {in.resetReaderIndex (); กลับ; } byte [] data = byte ใหม่ [datalength]; in.readbytes (ข้อมูล); Object obj = serializationutil.deserialize (data, genericClass); out.add (obj); -ใช้ rpcencoder เพื่อให้การเข้ารหัส RPC เพียงแค่ขยายวิธีการเข้ารหัสคลาสนามธรรมของ MessagetobyTencoder ของ Netty รหัสมีดังนี้:
ระดับสาธารณะ RPCencoder ขยาย Messagetobyteencoder {คลาสส่วนตัว <?> GenericClass; สาธารณะ rpCencoder (คลาส <?> genericClass) {this.genericClass = genericClass; } @Override โมฆะสาธารณะเข้ารหัส (channelHandlerContext CTX, Object in, bytebuf out) โยนข้อยกเว้น {ถ้า (genericclass.isinstance (in)) {byte [] data = serializationutil.serialize (in); out.writeInt (data.length); Out.writeBytes (ข้อมูล); -เขียนคลาสเครื่องมือ serializationutil และใช้ protostuff เพื่อใช้งานอนุกรม:
Public Class SerializationUtil {แผนที่คงที่ส่วนตัว <คลาส <?>, schema <? >> cachedschema = ใหม่พร้อมกันพร้อมกัน <> (); Objenesis แบบคงที่ส่วนตัว = Objenesisstd ใหม่ (TRUE); private serializationutil () {} @suppresswarnings ("ไม่ได้ตรวจสอบ") ส่วนตัวคงที่ <t> schema <t> getSchema (คลาส <t> cls) {schema <t> schema = (schema <t>) cachedschema.get (cls); if (schema == null) {schema = runtimeschema.createfrom (cls); if (schema! = null) {cachedschema.put (cls, schema); }} return schema; } @suppresswarnings ("ไม่ได้ตรวจสอบ") สาธารณะคงที่ <t> byte [] serialize (t obj) {class <t> cls = (คลาส <t>) obj.getclass (); LinkedBuffer buffer = linkedBuffer.Allocate (linkedBuffer.default_buffer_size); ลอง {schema <t> schema = getSchema (cls); ส่งคืน protostuffioutil.tobytearray (obj, schema, บัฟเฟอร์); } catch (exception e) {โยน unlueLstateException ใหม่ (e.getMessage (), e); } ในที่สุด {buffer.clear (); }} สาธารณะคงที่ <t> t deserialize (byte [] data, คลาส <t> cls) {ลอง {t message = (t) objenesis.newinstance (CLS); Schema <t> schema = getSchema (CLS); Protostuffioutil.mergefrom (ข้อมูล, ข้อความ, schema); ส่งคืนข้อความ; } catch (exception e) {โยน unlueLstateException ใหม่ (e.getMessage (), e); -ข้างต้นใช้ objenesis เพื่อสร้างอินสแตนซ์วัตถุซึ่งมีประสิทธิภาพมากกว่าการสะท้อนชวา
หมายเหตุ: หากคุณต้องการแทนที่เฟรมเวิร์กการทำให้เป็นอนุกรมอื่น ๆ เพียงแค่แก้ไข serializationutil แน่นอนวิธีที่ดีกว่าในการใช้งานคือการจัดเตรียมรายการการกำหนดค่าเพื่อตัดสินใจว่าจะใช้วิธีการแบบอนุกรมใด
ในการจัดการคำขอ RPC ใน RPChandler คุณเพียงแค่ต้องขยายคลาสนามธรรม SimpleChannelInboundHandler ของ Netty ของ Netty รหัสมีดังนี้:
ระดับสาธารณะ RPChandler ขยาย SimpleChannelInBoundHandler <RpCrequest> {ส่วนตัว logger สุดท้ายคงที่ logger = loggerFactory.getLogger (rpchandler.class); แผนที่สุดท้ายส่วนตัว <String, Object> Handlermap; สาธารณะ rpchandler (แผนที่ <สตริงวัตถุ> handlermap) {this.handlermap = handlermap; } @Override โมฆะสาธารณะ channelread0 (สุดท้าย channelhandlercontext ctx, คำขอ rpcrequest) โยนข้อยกเว้น {rpcresponse response = new rpcresponse (); Response.setRequestId (request.getRequestId ()); ลอง {object result = handle (คำขอ); Response.setResult (ผลลัพธ์); } catch (throwable t) {response.setError (t); } ctx.writeandflush (การตอบสนอง) .addlistener (channelfutureListener.close); } มือจับวัตถุส่วนตัว (คำขอ rpcrequest) พ่น trowable {string className = request.getClassName (); Object ServiceBean = handlermap.get (className); คลาส <?> serviceClass = serviceBean.getClass (); String methodName = request.getMetHodname (); คลาส <?> [] parameterTypes = request.getParameterTypes (); วัตถุ [] พารามิเตอร์ = request.getParameters (); /*วิธีการ = serviceClass.getMethod (MethodName, ParameterTypes); method.setAccessible (จริง); return method.invoke (servicebean, พารามิเตอร์);*/ fastclass servicefastclass = fastclass.create (serviceClass); FastMethod ServiceFastMethod = ServiceFastClass.getMethod (MethodName, ParameterTypes); ส่งคืน ServiceFastMethod.invoke (ServiceBean, พารามิเตอร์); } @Override void public excactioncaught (channelhandlercontext ctx, สาเหตุที่สามารถจดจำได้) {logger.error ("Server Catch Exception", สาเหตุ); ctx.close (); -เพื่อหลีกเลี่ยงปัญหาด้านประสิทธิภาพที่เกิดจากการใช้การสะท้อน Java เราสามารถใช้ API Reflection ที่จัดทำโดย CGLIB เช่น FastClass และ FastMethod ที่ใช้ด้านบน
ขั้นตอนที่ 7: กำหนดค่าไคลเอนต์
ใช้ไฟล์การกำหนดค่าสปริงเพื่อกำหนดค่าไคลเอนต์ RPC รหัส Spring.xml มีดังนี้:
<ถั่ว ... > <บริบท: สถานที่ตั้งสถานที่ตั้งตำแหน่ง = "classpath: config.properties"/> <!-กำหนดค่าส่วนประกอบการค้นพบบริการ-> <bean id = "servicediscovery"> <constructor-arg name = "registryaddress" value = "$ {registry.address}"/> </> <constructor-arg name = "servicediscovery" ref = "ServiceDiscovery"/> </ebean> </epeans>config.properties ให้การกำหนดค่าเฉพาะ:
# zookeeper server registry.address = 127.0.0.1: 2181
ขั้นตอนที่ 8: ใช้การค้นพบบริการ
ใช้ ZooKeeper เพื่อใช้ฟังก์ชั่นการค้นพบบริการดูรหัสต่อไปนี้:
Public Class ServicedIsCovery {ส่วนตัว Logger สุดท้ายคงที่ logger = loggerFactory.getLogger (servicediscovery.class); private countdownlatch latch = new countdownlatch (1); รายการผันผวนส่วนตัว <String> datalist = new ArrayList <> (); registryaddress สตริงส่วนตัว; Public ServiceDiscovery (String RegistryAddress) {this.registryAddress = RegistryAddress; ZooKeeper ZK = ConnectServer (); if (zk! = null) {watchNode (zk); }} สตริงสาธารณะ Discover () {String data = null; ขนาด int = datalist.size (); if (size> 0) {ถ้า (size == 1) {data = datalist.get (0); logger.debug ("ใช้เฉพาะข้อมูล: {}", ข้อมูล); } else {data = datalist.get (threadlocalrandom.current (). nextint (ขนาด)); logger.debug ("การใช้ข้อมูลแบบสุ่ม: {}", ข้อมูล); }} ส่งคืนข้อมูล; } zookeeper ส่วนตัว ConnectServer () {zookeeper zk = null; ลอง {zk = new zookeeper (registryAddress, constant.zk_session_timeout, new Watcher () {@Override กระบวนการโมฆะสาธารณะ (เหตุการณ์ดู Event) {ถ้า (Event.getState () == Event.Keeperstate.SyncConnected) {latch.countdown ();}}}}); latch.await (); } catch (ioexception | interruptedException e) {logger.error ("", e); } return zk; } โมฆะส่วนตัว watchNode (Zookeeper สุดท้าย zk) {ลอง {list <string> nodelist = zk.getchildren (constant.zk_registry_path, ผู้เฝ้าดูใหม่ () {@Override กระบวนการโมฆะสาธารณะ - รายการ <String> datalist = new ArrayList <> (); สำหรับ (โหนดสตริง: nodelist) {byte [] bytes = zk.getData (constant.zk_registry_path + "/" + โหนด, เท็จ, null); datalist.add (สตริงใหม่ (ไบต์)); } logger.debug ("Node Data: {}", Datalist); this.datalist = datalist; } catch (GeterException | interruptedException e) {logger.error ("", e); -ขั้นตอนที่ 9: การใช้ตัวแทน RPC
ที่นี่เราใช้เทคโนโลยีพร็อกซีแบบไดนามิกที่จัดทำโดย Java เพื่อใช้งานพร็อกซี RPC (แน่นอนมันสามารถนำไปใช้งานได้โดยใช้ CGLIB) รหัสเฉพาะมีดังนี้:
คลาสสาธารณะ rpcproxy {private string serveraddress; ServiceDiscovery Private ServiceDiscovery; สาธารณะ rpcproxy (String serveraddress) {this.serveraddress = serveraddress; } RPCPROXY สาธารณะ (ServiceCovery ServiceCovery) {this.serviceDiscovery = ServiceDiscovery; } @suppresswarnings ("ไม่ได้ตรวจสอบ") สาธารณะ <t> t สร้าง (คลาส <?> interfaceclass) {return (t) proxy.newproxyinstance (interfaceclass.getClassloader () คลาสใหม่ <?> [] {Interfaceclass} โยน {rpcrequest Request = ใหม่ rpcrequest (); request.setParameterTypes (method.getParameterTypes (); RPCCLIent Client = ใหม่ rpcclient (โฮสต์, พอร์ต); } else {return response.getResult (); - -ในการใช้ไคลเอนต์ RPC โดยใช้คลาส RPCCLient คุณจะต้องขยายคลาสนามธรรม SimpleChannelInboundhandler ที่จัดทำโดย Netty รหัสมีดังนี้:
ระดับสาธารณะ RPCCLIent ขยาย SimpleChannelInBoundHandler <RpCresponse> {ส่วนตัว Logger Logger สุดท้ายคงที่ = loggerFactory.getLogger (rpcclient.class); โฮสต์สตริงส่วนตัว พอร์ต int ส่วนตัว; การตอบสนอง RPCresponse ส่วนตัว วัตถุสุดท้ายส่วนตัว obj = วัตถุใหม่ (); สาธารณะ rpcclient (สตริงโฮสต์, พอร์ต int) {this.host = โฮสต์; this.port = พอร์ต; } @Override โมฆะสาธารณะ channelread0 (channelhandlercontext ctx, การตอบสนอง rpcresponse) โยนข้อยกเว้น {this.response = การตอบสนอง; ซิงโครไนซ์ (obj) {obj.notifyall (); // รับการตอบกลับปลุกเธรด}} @Override โมฆะสาธารณะข้อยกเว้น (channelhandlerContext CTX สาเหตุที่โยนได้) โยนข้อยกเว้น {logger.error ("ข้อยกเว้นการจับไคลเอ็นต์", สาเหตุ); ctx.close (); } สาธารณะ RPCresponse ส่ง (คำขอ RPCRequest) โยนข้อยกเว้น {EventLoopGroup Group = new NioEventLoopGroup (); ลอง {bootstrap bootstrap = new bootstrap (); bootstrap.group (กลุ่ม). Channel (niosocketChannel.class) .handler (ช่องใหม่ channelInitializer <OcketChannel> () {@Override โมฆะสาธารณะ initchannel (ช่อง Socketchannel) โยนข้อยกเว้น {channel.pipeline () .Addlast (ใหม่ rpcdecoder (rpcresponse.class)) // decode การตอบสนอง RPC (เพื่อจัดการกับการตอบกลับ) .Addlast (rpcclient.his); Channelfuture Future = bootstrap.connect (โฮสต์, พอร์ต) .sync (); future.channel (). writeandflush (คำขอ) .sync (); ซิงโครไนซ์ (obj) {obj.wait (); // ไม่ได้รับการตอบกลับทำให้เธรดรอ} ถ้า (ตอบกลับ! = null) {future.channel (). closeFuture (). sync (); } ตอบกลับการตอบกลับ; } ในที่สุด {group.shutdowngracefuly (); -ขั้นตอนที่ 10: ส่งคำขอ RPC
ใช้ Junit เพื่อเขียนการทดสอบหน่วยร่วมกับสปริงด้วยรหัสต่อไปนี้:
@runwith (springjunit4classrunner.class) @contextconfiguration (locations = "classpath: spring.xml") ชั้นเรียนสาธารณะ @Test สาธารณะโมฆะ helloTest () {helloService helloService = rpcproxy.create (helloService.class); สตริงผลลัพธ์ = helloService.hello ("โลก"); assert.assertequals ("Hello! World", ผลลัพธ์); -เรียกใช้การทดสอบหน่วยด้านบนและหากไม่มีอะไรเกิดขึ้นที่ไม่คาดคิดคุณควรเห็นแถบสีเขียว
สรุป
บทความนี้ใช้กรอบ RPC ที่มีน้ำหนักเบาผ่าน Spring + Netty + Protostuff + Zookeeper มันใช้สปริงเพื่อให้การฉีดขึ้นอยู่กับการพึ่งพาและการกำหนดค่าพารามิเตอร์ใช้ NetTy เพื่อใช้การส่งข้อมูล NIO ใช้ Protostuff เพื่อใช้การจัดลำดับวัตถุและใช้ Zookeeper เพื่อใช้การลงทะเบียนและการค้นหา การใช้เฟรมเวิร์กนี้สามารถปรับใช้บริการบนโหนดใด ๆ ในสภาพแวดล้อมแบบกระจาย ไคลเอนต์เรียกใช้การใช้งานเฉพาะของเซิร์ฟเวอร์ผ่านอินเทอร์เฟซระยะไกลแยกการพัฒนาของเซิร์ฟเวอร์และไคลเอนต์อย่างสมบูรณ์ซึ่งให้การสนับสนุนพื้นฐานสำหรับการใช้งานแอปพลิเคชันกระจายขนาดใหญ่
ภาคผนวก: การพึ่งพา Maven
<!-junit-> <predency> <sdeperency> <roupid> junit </groupId> <ratifactid> Junit </artifactId> <Sersion> 4.11 </เวอร์ชัน> <Scope> ทดสอบ </cope> </การพึ่งพา> <!-SLF4J-> <Sersion> 1.7.7 </Sident> </การพึ่งพาอาศัย> <!-ฤดูใบไม้ผลิ-> <การพึ่งพา> <roupId> org.springframework </groupId> <ratifactid> Spring-Context </artifactid> <Sersion> 3.2.12.Release </เวอร์ชัน> <ArtIfactId> การทดสอบสปริง </artifactid> <sersion> 3.2.12.release </เวอร์ชัน> <pope> ทดสอบ </cope> </percess> <!-netty-> <cendency> <roupid> io.netty </groupid> -> <predency> <roupId> com.dyuproject.protostuff </groupId> <ratifactid> protostuff-core </artifactid> <sersion> 1.0.8 </sention> </การพึ่งพาอาศัย> <!-Zookeeper-> <predency> <Sersion> 3.4.6 </Senture> </perctency> <!-Apache Commons Collections-> <การพึ่งพา> <roupId> org.apache.Commons </groupId> <ratifactId> Commons-Collections4 <ArtIfactId> objenesis </artifactid> <sersion> 2.1 </เวอร์ชัน> </การพึ่งพาอาศัย> <!-cglib-> <การพึ่งพา