QBIT Java Micorservices LIB Tutorials | เว็บไซต์ QBIT | QBIT ใช้ REAKT | QBIT ทำงานร่วมกับ vert.x | reakt vertx
Java Microservice Lib QBIT เป็น LIB การเขียนโปรแกรมปฏิกิริยาสำหรับการสร้าง Microservices - JSON, HTTP, WebSocket และ REST QBIT ใช้การเขียนโปรแกรมปฏิกิริยาเพื่อสร้างส่วนที่เหลือยืดหยุ่นและบริการบนคลาวด์ที่เป็นมิตรกับ WebSockets SOA พัฒนาขึ้นสำหรับมือถือและคลาวด์ ServiceDiscovery, Health, Reactive Statservice, เหตุการณ์, การเขียนโปรแกรมปฏิกิริยาแบบสำนวน Java สำหรับไมโครไซต์
มีคำถาม? ถามที่นี่: QBIT Google Group
ทุกอย่างเป็นคิว คุณมีทางเลือก คุณสามารถยอมรับและควบคุมมันได้ คุณสามารถปรับให้เหมาะสมสำหรับมัน หรือคุณสามารถซ่อนอยู่เบื้องหลัง abstractions QBIT เปิดคุณขึ้นไปมองสิ่งที่เกิดขึ้นและช่วยให้คุณสามารถดึงคันโยกได้โดยไม่ต้องขายวิญญาณของคุณ
QBIT เป็นห้องสมุดไม่ใช่กรอบ คุณสามารถผสมและจับคู่ qbit กับฤดูใบไม้ผลิ, guice ฯลฯ
ตอนนี้ QBIT รองรับสัญญา Reakt ที่ใช้งานได้สำหรับพร็อกซีไคลเอนต์ท้องถิ่นและระยะไกล สิ่งนี้ให้ API ที่คล่องแคล่วดีสำหรับการเขียนโปรแกรม Async
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();การโทรกลับ QBIT ตอนนี้ยัง reakt callbacks โดยไม่ทำลายสัญญา QBIT สำหรับการโทรกลับ
ดู REAKT INDICABLE PROMISES สำหรับรายละเอียดเพิ่มเติม
QBIT ถูกตีพิมพ์ให้กับ Maven Public Repo
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'นำไปใช้กับ บริษัท Fortune 100 ขนาดใหญ่หลายแห่ง ตอนนี้ QBIT ทำงานร่วมกับ VERTX (แบบสแตนด์อโลนหรือฝัง) นอกจากนี้คุณยังสามารถใช้ QBIT ในโครงการที่ไม่ใช่ QBIT เป็นเพียง LIB
Apache 2
QBIT มีบริการ inproc, Microservices REST และ Microservices WebSocket รวมถึง Bus Event Service Event Service (ซึ่งสามารถเป็นโมดูลหรือต่อแอป) รองรับคนงานและบริการในหน่วยความจำ
ก่อนที่เราจะอธิบายเพิ่มเติมนี่คือบริการตัวอย่างสองรายการ:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}ในตอนท้ายของวัน Qbit เป็นห้องสมุดง่าย ๆ ไม่ใช่กรอบ แอพของคุณไม่ใช่แอพ QBIT แต่เป็นแอพ Java ที่ใช้ QBIT LIB QBIT ช่วยให้คุณทำงานกับ Java Util พร้อมกันและไม่พยายามซ่อนมันจากคุณ แค่พยายามที่จะนำเอาออกไปจากมัน
เราได้ใช้เทคนิคใน Boon และ Qbit ที่ประสบความสำเร็จอย่างมากในแอพระดับสูงระดับสูงและมีประสิทธิภาพสูง เราช่วยลูกค้าจัดการโหลด 10 เท่าด้วย 1/10th เซิร์ฟเวอร์ของคู่แข่งโดยใช้เทคนิคใน QBIT QBIT เป็นเราป่วยจากการปรับคิวการปรับคิวและเธรด
แนวคิดสำหรับประโยชน์และ QBIT มักมาจากทั่วทั้งเว็บ เราทำผิดพลาด ชี้ให้เห็น ในฐานะนักพัฒนาของบุญและ QBIT เราเป็นเพื่อนนักเดินทาง หากคุณมีความคิดหรือเทคนิคที่คุณต้องการแบ่งปันเราก็ฟัง
แรงบันดาลใจที่ยิ่งใหญ่สำหรับ Boon/Qbit คือ Vertx, Akka, GO Channels, Objects Active, Threading Model Model, นักแสดงและเอกสารความเห็นอกเห็นใจเชิงกล
QBIT มีความคิดที่คล้ายกับกรอบงานมากมาย เราทุกคนกำลังอ่านเอกสารเดียวกัน QBIT ได้รับแรงบันดาลใจจากเอกสาร Lmax Disruptor และโพสต์บล็อกนี้เกี่ยวกับคิวการถ่ายโอนลิงค์กับ disruptor เรามีทฤษฎีบางอย่างเกี่ยวกับคิวที่โพสต์บล็อกเป็นแรงบันดาลใจให้เราลองพวกเขา ทฤษฎีเหล่านี้บางส่วนถูกนำไปใช้ในแบ็กเอนด์มิดเดิลแวร์ที่ใหญ่ที่สุดและแบรนด์ชื่อเป็นที่รู้จักกันทั่วโลก และทำให้ QBIT เกิด
QBIT ยังได้รับแรงบันดาลใจมากมายจากงานที่ยอดเยี่ยมที่ทำโดย Tim Fox บน Vertx โครงการแรกที่ใช้บางสิ่งที่อาจเรียกได้ว่า QBIT (แม้ว่า QBIT ต้น) กำลังใช้ VERTX บนเว็บ/ไมโครเซิร์ซบนมือถือสำหรับแอพที่อาจมีผู้ใช้ 80 ล้านคน มันเป็นประสบการณ์นี้กับ VertX และ QBIT ต้นที่นำไปสู่การพัฒนา QBIT และวิวัฒนาการ QBIT ถูกสร้างขึ้นบนไหล่ของยักษ์ (Netty/Vertx)
Spring Disruptor: ไม่คุณสามารถใช้ QBIT เพื่อเขียนปลั๊กอินสำหรับ Spring Disruptor ฉันคิดว่า แต่ QBIT ไม่ได้แข่งขันกับ Spring Disruptor Spring Boot/Spring MVC: ไม่เราใช้คำอธิบายประกอบเดียวกัน แต่ QBIT นั้นเหมาะสำหรับ Microservices ในหน่วยความจำความเร็วสูง มันเป็นเหมือน Akka มากกว่า Spring Boot QBIT มีชุดย่อยของคุณสมบัติของ Spring MVC ที่เหมาะสำหรับ microservices, เช่น, WebSocket RPC, REST, JSON MARSHALING ฯลฯ Akka: ไม่ Akka มีแนวคิดที่คล้ายกัน แต่พวกเขาใช้วิธีการที่แตกต่างกัน QBIT ให้ความสำคัญกับ Java และ Microservices (REST, JSON, WebSocket) มากกว่า Akka Lmax disruptor: ไม่ได้ในความเป็นจริงเราสามารถใช้ disruptor ได้ในคิวที่ QBIT ใช้ภายใต้ฝาครอบ
(เกณฑ์มาตรฐานก่อนถูกลบออกแล้วพวกเขาอยู่ที่นี่ Qbit ได้เร็วขึ้นมากการเปรียบเทียบ QBIT เป็นเป้าหมายที่เคลื่อนไหวในขณะนี้ลิงก์และรายงานจะถูกสร้างขึ้น)
ตัวอย่างรหัส
-
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)QBIT เป็นห้องสมุดคิวสำหรับ Microservices มันคล้ายกับโครงการอื่น ๆ อีกมากมายเช่น Akka, Spring Reactor ฯลฯ Qbit เป็นเพียงห้องสมุดที่ไม่ใช่แพลตฟอร์ม QBIT มีห้องสมุดที่จะให้บริการด้านหลังคิว คุณสามารถใช้คิว QBIT โดยตรงหรือคุณสามารถสร้างบริการ บริการ QBIT สามารถเปิดเผยได้โดย WebSocket, HTTP, HTTP PIPELINE และ Remoting ประเภทอื่น ๆ บริการใน QBIT เป็นคลาส Java ที่มีวิธีการที่จะดำเนินการหลังคิวการบริการ QBIT ใช้เกลียวโมเดลอพาร์ทเมนท์และคล้ายกับโมเดลนักแสดงหรือคำอธิบายที่ดีกว่าจะเป็นวัตถุที่ใช้งานอยู่ QBIT ไม่ได้ใช้ disruptor (แต่ทำได้) มันใช้คิว Java ปกติ QBIT สามารถทำทางเหนือของ 100 ล้านปิงปองโทรต่อวินาทีซึ่งเป็นความเร็วที่น่าทึ่ง (เห็นได้สูงถึง 200 ม.) QBIT ยังรองรับบริการการโทรผ่าน REST และ WebSocket QBIT เป็น microservices ในความรู้สึกของเว็บบริสุทธิ์: JSON, HTTP, WebSocket ฯลฯ QBIT ใช้การแบตช์ไมโครเพื่อกดข้อความผ่านท่อ (คิว, io, ฯลฯ ) เร็วขึ้นเพื่อลดการใช้ด้าย
QBIT เป็น Java Microservice LIB ที่สนับสนุน REST, JSON และ WebSocket มันเขียนใน Java แต่วันหนึ่งเราสามารถเขียนเวอร์ชันใน Rust หรือ Go หรือ C# (แต่นั่นต้องใช้เงินเดือนขนาดใหญ่)
Service Pojo (วัตถุ Java Old Old Old) หลังคิวที่สามารถรับวิธีการโทรผ่านการโทรหรือเหตุการณ์พร็อกซี (อาจมีหนึ่งเธรดการจัดการเหตุการณ์การโทรวิธีและการตอบสนองหรือสองครั้งสำหรับการโทรวิธีและเหตุการณ์และอื่น ๆ สำหรับการตอบกลับ อย่าปิดกั้นบริการ บริการสามารถใช้คำอธิบายประกอบสไตล์ Spring MVC REST เพื่อเปิดเผยตัวเองสู่โลกภายนอกผ่านทางพักผ่อนและ WebSocket
ServiceBundle pojos หลายคนอยู่เบื้องหลังหนึ่งคิวการตอบสนองและหลายคนได้รับคิว อาจมีหนึ่งเธรดสำหรับการตอบกลับทั้งหมดหรือไม่ พวกเขายังสามารถเป็นหนึ่งได้รับคิว
คิวเธรด ที่จัดการคิว รองรับการแบตช์ มันมีเหตุการณ์ที่ว่างเปล่าไปถึงจุดเริ่มต้นการเริ่มต้นใช้งานไม่ได้ใช้งาน คุณสามารถฟังกิจกรรมเหล่านี้ได้จากบริการที่อยู่ด้านหลังคิว คุณไม่จำเป็นต้องใช้บริการ คุณสามารถใช้ Direct ของคิว ใน QBIT คุณมีคิวผู้ส่งและคิวผู้รับ พวกเขาจะถูกแยกออกเพื่อรองรับการจับคู่ไมโคร
ServiceEndpointServer ServiceBundle ที่สัมผัสกับการสื่อสารที่เหลือและ WebSocket
Eventbus Eventbus เป็นวิธีส่งข้อความจำนวนมากไปยังบริการที่อาจมีการรวมกันอย่างหลวม ๆ
ClientProxy ClientProxy เป็นวิธีการเรียกใช้บริการผ่านอินเตอร์เฟส ASYNC บริการสามารถเป็น inProc (กระบวนการเดียวกัน) หรือ remoted ผ่าน WebSocket
QBIT ที่ไม่ปิดกั้น เป็น LIB ที่ไม่ปิดกั้น คุณใช้การโทรกลับผ่าน Java 8 Lambdas คุณยังสามารถส่งข้อความเหตุการณ์และรับการตอบกลับ การส่งข้อความถูกสร้างขึ้นในระบบเพื่อให้คุณสามารถประสานงานที่ซับซ้อนได้อย่างง่ายดาย QBIT ใช้วิธีการเชิงวัตถุเพื่อการพัฒนาบริการเพื่อให้บริการดูเหมือนบริการ Java ปกติที่คุณเขียนอยู่แล้ว แต่บริการอยู่เบื้องหลังคิว/เธรด นี่ไม่ใช่แนวคิดใหม่ Microsoft ทำสิ่งนี้ด้วย DCOM/COM และเรียกมันว่าวัตถุที่ใช้งานอยู่ Akka ทำกับนักแสดงและเรียกพวกเขาว่านักแสดงพิมพ์อย่างมาก แนวคิดที่สำคัญคือคุณจะได้รับความเร็วในการส่งข้อความแบบปฏิกิริยาและนักแสดง แต่คุณพัฒนาในแนวทาง OOP ตามธรรมชาติ QBIT ไม่ใช่คนแรก QBIT ไม่ใช่เพียงอย่างเดียว
ความเร็ว QBIT เร็วมาก มีพื้นที่มากสำหรับการปรับปรุง แต่แล้ว 200m+ TPS Inproc Ping Pong, 10m-20m+ TPS Event Bus, 500k TPS RPC เรียกผ่าน WebSocket/JSON ฯลฯ ต้องทำงานมากขึ้นเพื่อปรับปรุงความเร็ว แต่ตอนนี้มันเร็วพอที่เราจะมุ่งเน้นการใช้งานมากขึ้น การสนับสนุน JSON ใช้ประโยชน์โดยค่าเริ่มต้นซึ่งเร็วกว่า 4x เร็วกว่าตัวแยกวิเคราะห์ JSON อื่น ๆ สำหรับกรณีการใช้งาน REST/JSON, WebSocket/JSON
การเขียนโปรแกรมแบบปฏิกิริยา QBIT ให้ เครื่องปฏิกรณ์ เพื่อจัดการการโทรแบบ async สิ่งนี้จะช่วยให้สามารถจัดการการเรียกกลับในเธรดเดียวกันที่เรียกว่าและให้การจัดการหมดเวลาและการจัดการข้อผิดพลาด อ่านบทช่วยสอนเครื่องปฏิกรณ์เพื่อสร้างการเขียนโปรแกรมบริการไมโครปฏิกิริยา
การค้นพบบริการ ที่สร้างขึ้นเพื่อสนับสนุนการค้นพบบริการ ซึ่งรวมถึงการรวมเข้ากับกงสุล
Statservice ในตัวสนับสนุนสถิติ StatService สามารถรวมเข้ากับ StatsD (กราไฟท์, grafana, datadog ฯลฯ ) เพื่อเผยแพร่สถิติแบบพาสซีฟ หรือคุณสามารถสืบค้นเครื่องยนต์สถิติและตอบสนองต่อสถิติ (นับการกำหนดเวลาและระดับ) StatsService เป็นระบบสถิติปฏิกิริยาที่สามารถจัดกลุ่มได้ StatService มีปฏิกิริยาตอบสนองที่บริการของคุณสามารถเผยแพร่ไปยังมันและสอบถามและตอบสนองตามผลลัพธ์ คุณสามารถใช้สิ่งต่าง ๆ เช่นการ จำกัด อัตราและตอบสนองต่ออัตราที่เพิ่มขึ้นของบางสิ่ง ระบบ ServiceDiscovery รวมเข้ากับ HealthSystem และกงสุลเพื่อม้วนบริการภายในของคุณซึ่งประกอบขึ้นเป็นบริการไมโครและเผยแพร่คอมโพสิตของบริการไมโครของคุณไปยังจุดสิ้นสุด HTTP เดียวหรือสวิตช์ Dead Mans ในกงสุล (TTL)
พูดคุยราคาถูก มาดูรหัสกันบ้าง คุณสามารถรับรายละเอียดเดินผ่านในวิกิ เรามีเอกสารมากมายอยู่แล้ว
เราจะสร้างบริการที่เปิดเผยผ่าน REST/JSON
เพื่อสอบถามขนาดของรายการสิ่งที่ต้องทำ:
curl localhost:8080/services/todo-service/todo/countเพื่อเพิ่มรายการสิ่งใหม่
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todoเพื่อรับรายการสิ่งที่ต้องทำ
curl http://localhost:8080/services/todo-service/todo/ตัวอย่างสิ่งที่ต้องทำจะใช้และติดตามรายการสิ่งที่ต้องทำ
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;Todoservice ใช้คำอธิบายประกอบสไตล์สปริง MVC
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
} คุณสามารถโพสต์/ใส่ที่ไม่ใช่ JSON และคุณสามารถจับร่างกายเป็น String หรือเป็น byte[] หากตั้งค่าประเภทเนื้อหาเป็นอะไรก็ได้ยกเว้น application/json และร่างกายของคุณถูกกำหนดสตริงหรือไบต์ [] สิ่งนี้ใช้งานได้โดยอัตโนมัติ (ต้องตั้งค่าประเภทเนื้อหา)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
} โดยค่าเริ่มต้น QBIT จะส่ง 200 (ตกลง) สำหรับการโทรที่ไม่ได้เป็นกรด (การโทรที่มีการส่งคืนหรือการโทรกลับ) หากการดำเนินการที่เหลือไม่มีการส่งคืนหรือไม่มีการโทรกลับ QBIT จะส่ง 202 (ยอมรับ) อาจมีบางครั้งที่คุณต้องการส่ง 201 (สร้าง) หรือรหัสอื่น ๆ ที่ไม่ใช่ข้อยกเว้น คุณสามารถทำได้โดยการตั้ง code บน @RequestMapping โดยค่าเริ่มต้นรหัสคือ -1 ซึ่งหมายถึงใช้พฤติกรรมเริ่มต้น (200 สำหรับความสำเร็จ, 202 สำหรับข้อความทางเดียวและ 500 สำหรับข้อผิดพลาด)
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks สามารถใช้สำหรับบริการภายในได้เช่นกัน มักจะเป็นกรณีที่คุณใช้ backbuilder หรือเครื่องปฏิกรณ์ QBIT เพื่อจัดการการโทรบริการ
คุณไม่ต้องโทรกลับแบบฟอร์ม JSON คุณสามารถส่งคืนไบนารีหรือข้อความใด ๆ ได้โดยใช้ HttpBinaryResponse และ HttpTextResponse
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}ทำไมเราถึงเลือกคำอธิบายประกอบสไตล์ฤดูใบไม้ผลิ?
ตอนนี้เพิ่งเริ่มต้น
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}นั่นคือ นอกจากนี้ยังมีการสนับสนุน Box WebSocket พร้อมการสร้างพร็อกซีฝั่งไคลเอ็นต์เพื่อให้คุณสามารถโทรไปยังบริการในอัตราการโทรหลายล้านสายต่อวินาที
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}คุณสามารถเรียกใช้บริการ QBIT ผ่าน WebSocket Proxy ได้ตลอดเวลา ข้อได้เปรียบของพร็อกซี WebSocket คือช่วยให้คุณดำเนินการ 1M RPC+ ต่อวินาที (1 ล้านสายรีโมตทุกวินาที)
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );เอาต์พุตคือ 3
3
ด้านบนใช้อินเทอร์เฟซพร็อกซี WebSocket เพื่อเรียกบริการ Async
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}สร้างไคลเอนต์ Service WebSocket ที่ให้บริการค้นหา
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" ); ปัจจุบัน clientBuilder จะโหลดจุดสิ้นสุดบริการทั้งหมดที่ลงทะเบียนภายใต้ชื่อบริการและสุ่มเลือกหนึ่ง
ServiceDiscovery รวมถึงกงสุลโดยดูไฟล์ JSON บนดิสก์และ DNS เป็นเรื่องง่ายที่จะเขียนการค้นพบบริการของคุณเองเช่นกันและเสียบเข้ากับ QBIT
ในอนาคตเราสามารถเรียกใช้ roundrobin หรือเรียกใช้การโทรไปยังบริการ WebSocket และ/หรือให้อัตโนมัติล้มเหลวหากการเชื่อมต่อถูกปิด เราทำสิ่งนี้สำหรับรถบัสกิจกรรมที่ใช้การค้นพบบริการ แต่ยังไม่ได้อบลงในต้นขั้วไคลเอนต์ที่ใช้ WebSocket
ตัวอย่างไคลเอนต์ล่าสุดใช้ WebSocket คุณสามารถใช้ส่วนที่เหลือและใช้พารามิเตอร์ URI ที่เราตั้งค่า ส่วนที่เหลือเป็นสิ่งที่ดี แต่จะช้ากว่าการรองรับ WebSocket
QBIT จัดส่งด้วยไคลเอนต์ HTTP ตัวเล็ก ๆ ที่ดี เราสามารถใช้มันได้
คุณสามารถใช้เพื่อส่งข้อความ Async และ WebSocket ด้วยไคลเอนต์ HTTP
ที่นี่เราจะใช้ไคลเอนต์ HTTP เพื่อเรียกใช้วิธีระยะไกลของเรา:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );ผลลัพธ์คือ 4
4
คุณยังสามารถเข้าถึงบริการจาก Curl
$ curl http://localhost:7000/services/adder-service/add/2/2ดูตัวอย่างเต็มรูปแบบนี้ที่นี่: QBIT Microservice เริ่มต้นใช้งานการสอน
qbit uri params และ websocket proxy client
QBIT มีห้องสมุดสำหรับการทำงานกับและเขียน microservices async ที่มีน้ำหนักเบาและใช้งานสนุก
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
ตอนนี้หยุดเซิร์ฟเวอร์และไคลเอนต์ ค่อนข้างง่ายใช่มั้ย?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
โฟกัสใช้งานง่ายและใช้ Java 8 Lambdas สำหรับการโทรกลับดังนั้นรหัสจึงแน่นและเล็ก
ค้นหาข้อมูลเพิ่มเติมเกี่ยวกับการสนับสนุน WebSocket สไตล์ Microservice ของ QBIT ได้ที่นี่
ตอนนี้ลองใช้ไคลเอนต์ HTTP ของเรากันเถอะ
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();คุณเพียงแค่ผ่าน URL พอร์ตแล้วโทรเริ่ม
ตอนนี้คุณสามารถเริ่มส่งคำขอ HTTP
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );การตอบสนอง HTTP นั้นมีผลลัพธ์จากเซิร์ฟเวอร์
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}มีวิธีการช่วยในการซิงค์ http รับสาย
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
วิธีการ Puts เป็นวิธีการของผู้ช่วยที่ใช้ระบบ.
ห้าพารามิเตอร์แรกได้รับการคุ้มครอง เกินห้าคุณต้องใช้ httpbuilder
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );มีการเรียก Async เพื่อรับเช่นกัน
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[ค้นหาเพิ่มเติมเกี่ยวกับไคลเอนต์ HTTP ที่ใช้งานง่ายและใช้งานง่ายที่นี่] (https://github.com/Advantage/QBIT/WIKI/%5BDOC%5D-USING-QBIT-MICROSERVICE-LIB'S-httpClient-get -et-al, -json, -java-8-Lambda)
QBIT อนุญาตให้บริการที่อยู่เบื้องหลังคิวในการดำเนินการเช่นกัน
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());มีการเขียนแบบละเอียดเกี่ยวกับบริการในโปรแกรม
QBIT Event Bus ตัวอย่างรายละเอียดเพิ่มเติม
QBIT ยังมีรถบัสบริการ ตัวอย่างนี้เป็นตัวอย่างบริการผลประโยชน์ของพนักงาน
เรามีสองช่อง
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
วัตถุของพนักงานมีลักษณะเช่นนี้:
public static class Employee {
final String firstName ;
final int employeeId ;ตัวอย่างนี้มีสามบริการ: EmployeeHiringservice, Benefitservice และ Payrollservice
บริการเหล่านี้เป็นบริการ inproc QBIT รองรับ WebSocket, HTTP และ REST Remote Services เช่นกัน แต่ตอนนี้เรามามุ่งเน้นไปที่บริการ INPROC หากคุณเข้าใจ Inproc คุณจะเข้าใจระยะไกล
EmployeeHiringservice ยิงออกจากเหตุการณ์ไปยังบริการอีกสองบริการ
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}ขอให้สังเกตว่าเราเรียก SendArray เพื่อให้เราสามารถส่งพนักงานและเงินเดือนของพวกเขา ผู้ฟังสำหรับ PAYROLL_ADJUSTMENT_CHANNEL จะต้องจัดการทั้งพนักงานและ INT ที่แสดงถึงเงินเดือนพนักงานใหม่ นอกจากนี้คุณยังสามารถใช้พร็อกซีกิจกรรมบัสเพื่อให้คุณไม่ต้องโทรไปยังรถบัสกิจกรรมเลย
Benefitsenservice รับฟังสำหรับพนักงานใหม่ที่ได้รับการว่าจ้างเพื่อให้สามารถลงทะเบียนเข้าสู่ระบบผลประโยชน์ได้
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}พ่อต้องได้รับเงิน
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}พนักงานเป็นวัตถุของพนักงานจากพนักงานบริการพนักงาน
เพื่อให้คุณได้รับผลประโยชน์และชำระเงิน!
ค้นหารายละเอียดเพิ่มเติมที่นี่:
QBIT Event Bus ตัวอย่างรายละเอียดเพิ่มเติม
คุณสามารถกำหนดอินเทอร์เฟซของคุณเองไปยังบัสกิจกรรมและคุณสามารถใช้รถบัสกิจกรรมของคุณเองกับ QBIT แต่ละโมดูลในบริการของคุณสามารถมีบัสเหตุการณ์ภายในของตัวเอง
หากต้องการเรียนรู้เพิ่มเติมอ่าน: QBIT Microservice ทำงานร่วมกับรถบัสส่วนตัวและ QBIT Java Microservice LIB โดยใช้อินเทอร์เฟซของคุณเองไปยังบัสกิจกรรม
เพื่อให้เข้าใจ QBIT อย่างแท้จริงเราต้องเข้าใจแนวคิดของการโทรกลับ
การโทรกลับเป็นวิธีที่จะได้รับการตอบกลับแบบ async ใน QBIT
คุณเรียกวิธีการบริการและโทรกลับ
พร็อกซีไคลเอนต์สามารถโทรกลับได้:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}การโทรกลับเป็นผู้บริโภค Java 8 ที่มีการจัดการข้อผิดพลาดพิเศษเพิ่มเติม
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}บริการที่สามารถบล็อกควรใช้การโทรกลับ ดังนั้นหากตัวโหลดที่ถูกบล็อกในตัวอย่างต่อไปนี้ควรใช้การโทรกลับแทนการส่งคืนค่า
คำแนะนำชั้นเรียนสาธารณะบริการ {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
} มาแกล้งทำเป็นว่า loadUser จะต้องดูในแคชท้องถิ่นและหากไม่พบผู้ใช้ให้ดูในแคชปิดกองและหากไม่พบมันจะต้องขอให้ผู้ใช้จากผู้ใช้บริการซึ่งจะต้องตรวจสอบแคชและอาจกลับไปโหลด ข้อมูลผู้ใช้จากฐานข้อมูลหรือจากบริการอื่น ๆ กล่าวอีกนัยหนึ่ง loadUser อาจบล็อก IO
ลูกค้าของเราไม่ได้บล็อก แต่บริการของเราทำ กลับไปที่ RecommendationService ของเรา หากเราได้รับแคชจำนวนมากสำหรับการโหลดของผู้ใช้บางทีบล็อกอาจไม่นานขนาดนั้น แต่มันจะอยู่ที่นั่นและทุกครั้งที่เราต้องทำผิดพลาดในผู้ใช้ระบบทั้งหมดจะถูก gumm สิ่งที่เราต้องการทำได้คือถ้าเราไม่สามารถจัดการกับคำขอคำ UserDataService ได้ เมื่อการโทรกลับ Async กลับมาแล้วเราจะจัดการคำขอนั้น ในช่วงเวลานั้นเราจัดการรายการคำแนะนำคำขอให้เร็วที่สุดเท่าที่จะทำได้ เราไม่เคยบล็อก
ลองมาเยี่ยมบริการอีกครั้ง สิ่งแรกที่เราจะทำคือทำให้วิธีการบริการกลับมา ก่อนที่เราจะทำเช่นนั้นให้กำหนดกฎบางอย่าง
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {ตอนนี้เรากำลังโทรกลับและเราสามารถตัดสินใจได้เมื่อเราต้องการจัดการคำขอสร้างคำแนะนำนี้ เราสามารถทำได้ทันทีหากมีข้อมูลผู้ใช้ที่เราต้องการคือในหน่วยความจำหรือเราสามารถล่าช้าได้
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
} ขอให้สังเกตว่าหากผู้ใช้พบในแคชเราเรียกใช้กฎคำแนะนำของเราในหน่วยความจำและโทรกลับ recommendationsCallback.accept(runRulesEngineAgainstUser(user))
ส่วนที่น่าสนใจคือเราจะทำอย่างไรถ้าไม่มีผู้ใช้โหลด
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
... ที่นี่เราใช้การโทรกลับเพื่อโหลดผู้ใช้และเมื่อผู้ใช้โหลดเราจะเรียก handleLoadFromUserDataService ซึ่งเพิ่มการจัดการบางอย่างเกี่ยวกับการจัดการการโทรกลับเพื่อให้เรายังสามารถจัดการการโทรนี้ได้
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...การใช้แลมบ์ดาเช่นนี้ทำให้รหัสอ่านและสั้นขึ้น แต่โปรดจำไว้ว่าอย่านิพจน์แลมบ์ดารังอย่างลึกล้ำหรือคุณจะสร้างฝันร้ายการบำรุงรักษารหัส ใช้พวกเขาอย่างรอบคอบ
สิ่งที่เราต้องการคือการจัดการคำขอสำหรับคำแนะนำหลังจากระบบบริการผู้ใช้โหลดผู้ใช้จากร้านค้า
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
} ส่วนสำคัญคือทุกครั้งที่เราได้รับการโทรกลับจาก UserDataService เราจะดำเนินการตามกฎการแนะนำ CPU แบบเข้มข้นของ CPU และโทรกลับผู้โทรของเรา ไม่แน่นอนสิ่งที่เราทำคือ enqueue unnable บนคิวการโทรกลับของเราและหลังจากนั้นเราจะวนซ้ำผ่านสิ่งเหล่านั้น แต่เมื่อไหร่?
RecommendationService สามารถแจ้งเตือนได้เมื่อคิวว่างเปล่ามันได้เริ่มต้นแบทช์ใหม่และเมื่อถึงขีด จำกัด ของแบทช์ นี่เป็นช่วงเวลาที่ดีในการจัดการการโทรกลับจาก UserDataService
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}เป็นสิ่งสำคัญที่ต้องจำไว้เมื่อจัดการการโทรกลับจาก microservice อื่นที่คุณต้องการจัดการการโทรกลับจากบริการอื่น ๆ ก่อนที่คุณจะจัดการคำขอที่กำลังดำเนินอยู่มากขึ้นจากลูกค้าของคุณ โดยพื้นฐานแล้วคุณมีลูกค้าที่รออยู่ (async รออยู่ แต่ยัง) และลูกค้าเหล่านี้อาจเป็นตัวแทนของการเชื่อมต่อ TCP/IP แบบเปิดเช่นการโทร http ดังนั้นจึงเป็นการดีที่สุดที่จะปิดพวกเขาก่อนที่จะจัดการคำขอเพิ่มเติมและเหมือนที่เราบอกว่าพวกเขากำลังรออยู่แล้ว รอบ ๆ ด้วยการเชื่อมต่อแบบเปิดสำหรับผู้ใช้ในการโหลดแบบฟอร์มบริการผู้ใช้
หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการโทรกลับ Plesae อ่าน [QBIT Java Microservice LIB การโทรกลับพื้นฐาน] ([cut Rough Cut] QBIT Microservice LIB ทำงานกับการโทรกลับ)
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...คุณสามารถเขียนคนงานที่มีเศษ (สำหรับในหน่วยความจำ, กระทู้ปลอดภัย, บริการแบบเร่งรัดซีพียู) หรือคนงานสำหรับ IO หรือพูดคุยกับบริการต่างประเทศหรือรถบัสต่างประเทศ
นี่คือตัวอย่างที่ใช้กลุ่มพนักงานที่มีพนักงานบริการสามคนอยู่ในนั้น:
สมมติว่าคุณมีบริการที่ทำอะไรบางอย่าง:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}ตอนนี้นี่เป็น IO บางประเภทและคุณต้องการให้ธนาคารกำลังวิ่งเหล่านี้ไม่ใช่แค่หนึ่งเดียวเพื่อให้คุณสามารถทำ io ได้ในแบบคู่ขนาน หลังจากการทดสอบประสิทธิภาพคุณพบว่าสามคนคือหมายเลขเวทย์มนตร์
คุณต้องการใช้ API ของคุณเพื่อเข้าถึงบริการนี้:
public interface MultiWorkerClient {
void doSomeWork (...);
}ตอนนี้เรามาสร้างธนาคารของสิ่งเหล่านี้และใช้มัน
ก่อนอื่นสร้างบริการ QBIT ซึ่งเพิ่มเธรด/คิว/microbatch
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();ตอนนี้เพิ่มลงในวัตถุผู้ให้บริการ
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workersคุณสามารถเพิ่มบริการ pojos และผู้บริโภควิธีการ dispatchers วิธีการไปยังชุดบริการ ชุดบริการเป็นจุดรวมเข้ากับ QBIT
มาเพิ่มพนักงานบริการใหม่ของเรากันเถอะ ผู้ให้บริการเป็น ServiceMethodDispatcher
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();เราอาจจะเพิ่มวิธีการช่วยในชุดบริการเพื่อให้สิ่งเหล่านี้ส่วนใหญ่สามารถเกิดขึ้นได้ในการโทรครั้งเดียว
ตอนนี้คุณสามารถเริ่มใช้คนงานของคุณ
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );ตอนนี้คุณสามารถใช้ Spring หรือ Guice เพื่อกำหนดค่าผู้สร้างและชุดบริการ แต่คุณสามารถทำได้เช่นเดียวกับข้างต้นซึ่งเป็นสิ่งที่ดีสำหรับการทดสอบและทำความเข้าใจกับ qbit internals
QBIT ยังรองรับแนวคิดของบริการที่มีการให้บริการซึ่งเป็นสิ่งที่ดีสำหรับทรัพยากรการให้ข้อมูลเช่น CPU (เรียกใช้เอ็นจิ้นกฎในแต่ละ CPU Core สำหรับเอ็นจิ้นแนะนำผู้ใช้)
QBIT ไม่ทราบวิธีการให้บริการของคุณคุณต้องให้คำใบ้ คุณทำสิ่งนี้ผ่านกฎชาร์ด
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}เราทำงานในแอพที่อาร์กิวเมนต์แรกสำหรับบริการคือชื่อผู้ใช้และจากนั้นเราใช้สิ่งนั้นเพื่อเรียกใช้การโทรไปยังซีพียูที่เข้มข้นในหน่วยความจำ เทคนิคนี้ใช้งานได้ -
คลาส Serviceworkers มีวิธีการสร้างกลุ่มคนงาน
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}หากต้องการใช้คุณเพียงแค่ผ่านคีย์ Shard เมื่อคุณสร้างพนักงานบริการ
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});จากนั้นเพิ่มบริการของคุณในองค์ประกอบของผู้ให้บริการ
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}จากนั้นเพิ่มลงในชุดบริการเหมือนก่อน
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();จากนั้นใช้มัน:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}ShardonbeanPath ช่วยให้คุณสร้างสายนำทางถั่วที่ซับซ้อนและใช้คุณสมบัติของมันเพื่อ Shard บน
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );อ่านเพิ่มเติมเกี่ยวกับบริการการให้บริการและพนักงานบริการที่นี่
คุณสามารถค้นหาได้มากขึ้นในวิกิ ยังติดตามการกระทำ เรายุ่งกับบีเว่อร์ qbit microservice lib สำหรับ java - json, rest, websocket