Qbit Java Micorservices lib教程| QBIT网站| Qbit使用Reakt | QBIT与Vert.x |一起使用Reakt Vertx
Java微服务自由。 QBIT是用于构建微服务的反应性编程LIB -JSON,HTTP,WebSocket和REST。 QBIT使用反应性编程来构建弹性休息和基于Websocket的云友好,Web服务。 SOA用于移动和云。维修,健康,反应性统计服务,事件,用于微服务的Java惯用反应性编程。
有问题吗?在这里询问:QBIT Google Group。
一切都是队列。你有选择。您可以拥抱并控制它。您可以为此进行优化。或者,您可以躲在抽象后面。 QBIT让您窥视正在发生的事情,并让您在不出售灵魂的情况下拉动一些杠杆。
Qbit是库而不是框架。您可以将Qbit与春季,Guice等混合匹配。
QBIT现在支持REAKT引用的本地和远程客户端代理的承诺。这为异步编程提供了一个不错的流利API。
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();QBIT回调现在也已成为REAKT回调,而无需打破QBIT合同以进行回调。
有关更多详细信息,请参见Reakt引起的承诺。
QBIT已发布给Maven Public Repo。
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'部署在几家大型财富100大公司中。 QBIT现在与VERTX(独立或嵌入式)一起使用。您也可以在非QBIT项目上使用QBIT,这只是一个LIB。
Apache 2
QBIT具有INPROC服务,REST微服务和Websocket微服务以及程序内服务事件总线(可以是每个模块或每个应用程序)。它支持工人和内存服务。
在我们描述更多之前,这里有两个示例服务:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}归根结底,Qbit是一个简单的库而不是一个框架。您的应用不是QBIT应用程序,而是使用QBIT LIB的Java应用程序。 QBIT允许您与Java Util并发合作,并且不会努力向您隐藏它。只是试图将刺痛从中取出。
我们已经在Boon和QBIT中使用了技术,在高端高性能,高尺度的应用程序中取得了巨大成功。我们用QBIT中的技术帮助客户通过1/10的竞争对手服务器来处理10倍负载。 Qbit是我们厌倦了手工调整队列访问和线程。
Boon和Qbit的想法通常来自网络。我们犯错。指出他们。作为Boon和Qbit的开发商,我们是旅行者。如果您想分享一个想法或技术,我们会听。
Boon/Qbit的一个很大的灵感是Vertx,Akka,Go频道,活动对象,公寓模型螺纹,演员和机械同情论文。
Qbit的想法与许多框架相似。我们都在阅读相同的论文。 QBIT从LMAX DISPRUEATER论文中获得了启发,以及有关链接传输队列与破坏者的博客文章。我们有一些关于队列的理论,博客文章启发了我们尝试一下。其中一些理论部署在一些最大的中间件后端,其名称品牌在世界范围内广为人知。因此Qbit诞生了。
Qbit还为蒂姆·福克斯(Tim Fox)在VERTX上完成的伟大工作汲取了很多灵感。第一个使用实际上称为QBIT的项目(尽管早期QBIT)是在Web/Mobile Microservice上使用VERTX来用于可能有8000万用户的应用程序。正是这种在VERTX和早期QBIT的经历导致了QBIT的发展和进化。 QBIT建立在巨人(Netty/Vertx)的肩膀上。
春季破坏者:不。我想您可以使用Qbit为弹簧破坏者编写插件,但是QBIT并未与弹簧破坏者竞争。春季启动/春季MVC:否。我们使用相同的注释,但QBIT适用于高速内存微服务。它更像是Akka,而不是Spring Boot。 QBIT具有仅适用于微服务的Spring MVC功能的子集,即,Websocket RPC,REST,JSON MASSHALING等。AKKA:不,也许。 Akka具有类似的概念,但他们采用了不同的方法。 QBIT比Akka更专注于Java和Microservices(REST,JSON,Websocket)。 lmax颠覆器:不。实际上,我们可以使用disruptor作为Qbit在盖下使用的队列。
(已经删除了早期基准。它们在这里。QBIT的速度更快。基准测试QBIT目前是一个移动的目标。将创建链接和报告。)
代码示例
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)QBIT是用于微服务的排队库。它类似于其他许多项目,例如Akka,Spring Reactor等。QBIT只是一个库而不是平台。 Qbit有图书馆可以在队列后面提供服务。您可以直接使用QBIT队列,也可以创建服务。 QBIT服务可以由Websocket,HTTP,HTTP管道和其他类型的远程服务公开。 QBIT中的服务是Java类,其方法是在服务队列后面执行的。 QBIT实施公寓模型螺纹,与Actor模型相似,或者更好的描述将是活动对象。 QBIT不使用破坏者(但可以)。它使用常规的Java队列。 Qbit可以在每秒1亿次乒乓球的北部进行,这是一个惊人的速度(高达200m)。 QBIT还通过REST和Websocket支持呼叫服务。 QBIT是纯Web意义上的微服务:JSON,HTTP,WebSocket等。QBIT使用微批次来通过管道推动消息(队列,IO等)更快地减少线程交换。
QBIT是支持REST,JSON和WebSocket的Java微服务LIB。它是用Java编写的,但有一天我们可以用Rust或Go或C#写一个版本(但这需要大的发薪日)。
服务pojo(普通旧的java对象)在队列后面可以通过代理呼叫或事件接收方法调用(可能有一个线程管理事件,方法呼叫和两个响应或两个用于方法呼叫和事件,而另一个用于响应除非响应块,否则请勿阻止服务。服务可以使用Spring MVC样式REST注释通过Rest和Websocket暴露于外界。
ServiceBundle在一个响应队列后面的许多Pojos,许多Pojos都会收到队列。所有响应是否都可能有一个线程。他们也可以是一个接收队列。
排队管理队列的线程。它支持批处理。它有空的事件,到达,到达Limit,startBatch,闲置。您可以从排队后面的服务中收听这些事件。您不必使用服务。您可以使用队列的直接。在QBIT中,您有发件人的队列和接收器队列。它们分开以支持微批量。
ServiceEndPointServer ServiceBundle,可暴露于REST和WESTOCKECT通信。
EventBus EventBus是一种向可能松散耦合的服务发送大量消息的方式。
ClientProxy ClientProxy是通过异步接口调用服务的一种方式,服务可以是INPROC(相同的过程)或通过WebSocket远离。
非阻滞QBIT是一种非阻滞性LIB。您可以通过Java 8 Lambdas使用回调。您还可以发送事件消息并获得答复。消息传递已内置在系统中,因此您可以轻松地协调复杂的任务。 QBIT采用以对象为导向的服务开发方法,因此服务看起来像您已经编写的普通Java服务,但是服务在队列/线程后面存在。这不是一个新概念。 Microsoft使用DCOM/COM进行了此操作,并将其称为Active对象。 Akka与演员一起做这件事,并称他们为强烈打字的演员。重要的概念是,您可以获得反应性和演员风格的消息传递的速度,但以自然的OOP方法发展。 Qbit不是第一个。 Qbit并不是唯一的。
QBIT非常快。当然有很多改进的空间。但是,已经200m+ TPS Inproc ping Pong,10m-20m+ TPS事件总线,500K TPS RPC通过WebSocket/JSOND呼叫。 JSON支持使用BOON默认情况下,其剩下的JSON/JSON,WebSocket/JSON用例的其他JSON解析器的速度最高4倍。
反应性编程QBIT提供了一个反应器来管理异步调用。这允许在称呼它们的同一线程上处理回调,并提供超时和错误处理。读取用于创建反应性微服务编程的反应器教程
服务发现,以支持服务发现的支持。这包括与领事的集成。
STATSERVICE构建以支持统计数据。 STATSERVICE可以与Statsd (Graphite,Grafana,DataDog等)集成以发布被动统计数据。或者,您可以查询统计引擎并对统计数据(计数,时间和级别)做出反应。 STATSSERVICE是一个可以聚类的反应性统计系统。 STATSERVICE具有反应性,因为您的服务可以发布并根据结果进行查询并做出反应。您可以实施诸如限制速率的事情,并对提高的事物做出反应。 Serviciscovery系统与Healthsystem集成在一起,并汇总您的每项内部服务,这些内部服务构成了您的微服务,并将您的微服务的复合材料发布到单个HTTP端点或领事(TTL)中的Dead Mans Switch。
谈话很便宜。让我们看一些代码。您可以在Wiki中进行详细的步行。我们已经有很多文档了。
我们将创建通过REST/JSON暴露的服务。
查询待办事项列表的大小:
curl localhost:8080/services/todo-service/todo/count添加一个新的汤托物品。
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todo获取待办事项清单
curl http://localhost:8080/services/todo-service/todo/待办事项将使用和跟踪待办事项。
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;TodoService使用Spring MVC样式注释。
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}您可以发布/放置非json,并且可以将其捕获为String或byte[] 。如果内容类型设置为application/json ,并且您的身体被定义为字符串或字节[]。这会自动起作用。 (必须设置内容类型。)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}默认情况下,QBIT将发送200 (确定)以进行非空隙呼叫(带有返回或回调的呼叫)。如果其余操作没有返回或没有回调,则QBIT发送202 (接受)。有时候,您可能要发送201(创建)或其他不例外的代码。您可以通过在@RequestMapping上设置code来做到这一点。默认情况下,代码为-1,这意味着使用默认行为(成功的200,单向消息202,错误500)。
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks也可用于内部服务。通常,您使用回调布置器或QBIT反应器来管理服务呼叫。
您不必返回JSON表格rest呼叫。您可以使用HttpBinaryResponse和HttpTextResponse返回任何二进制或任何文本。
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}我们为什么选择春季风格的注释?
现在就开始。
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}就是这样。还提供了带有客户端代理生成的Box Websocket支持,因此您可以以每秒数百万个电话的速度拨打服务。
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}您始终可以通过Websocket代理调用QBIT服务。 WebSocket代理的优点是,它允许您执行1M RPC+一秒钟(每秒100万个遥控电话)。
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );输出为3。
3
以上使用WebSocket代理接口来调用服务异步。
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}创建Websocket服务客户端的客户端口。
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );当前, clientBuilder将加载以服务名称注册的所有服务端点,并随机选择一个。
ServiceScovery包括基于领事的,在磁盘上观看JSON文件和DNS。也很容易编写自己的服务发现并将其插入QBIT。
将来,如果连接已关闭,我们可以通话或拨打Websocket服务和/或提供自动失败。我们为使用服务发现的事件总线这样做,但尚未烘烤到基于Websocket的客户端存根中。
最后一个客户端示例使用websocket。您也可以只使用REST,然后实际使用我们设置的URI参数。休息很好,但是比WebSocket的支持要慢。
QBIT与一个漂亮的小型HTTP客户端发货。我们可以使用它。
您可以使用它与HTTP客户端一起发送异步电话和Websocket消息。
在这里,我们将使用HTTP客户端调用我们的远程方法:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );输出为4。
4
您也可以从Curl访问服务。
$ curl http://localhost:7000/services/adder-service/add/2/2在此处查看此完整示例:QBIT微服务入门教程。
QBIT URI参数和WebSocket代理客户端
QBIT有一个可以使用和编写异步微服务的库,使用的是轻巧且有趣的。
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
现在停止服务器和客户端。非常容易吗?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
重点是易用性,并使用Java 8 lambdas进行回调,因此代码紧密而小。
在此处查找有关QBIT的微服务样式Websocket支持的更多信息
现在,让我们尝试一下我们的HTTP客户端。
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();您只需通过URL,端口,然后致电启动即可。
现在,您可以开始发送HTTP请求。
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );HTTP响应仅包含服务器的结果。
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}有同步HTTP GET呼叫的辅助方法。
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
puts方法是助手方法,它或多或少或多或少。
前五个参数被涵盖。超过五个,您必须使用httpbuilder。
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );也有异步呼吁获得。
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[在此处找到有关易于使用的易于使用的有关易于使用的易于使用的微服务http client](https://github.com/advantage/qbit/qbit/wiki/%5bdoc%5d-usis-roservice-microservice-microservice-lib's-httpclient-get-post,post,post, -et-al,-json,-java-8-lambda)。
QBIT允许在程序内运行队列后面的服务。
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());正在编写有关程序内服务的详细教程。
QBIT事件总线更详细的示例
QBIT还拥有服务事件总线。此示例是一个员工福利服务示例。
我们有两个渠道。
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
员工对象看起来像:
public static class Employee {
final String firstName ;
final int employeeId ;此示例有三个服务:员工HiringService,Benefitservice和PayrollService。
这些服务是INPROC服务。 QBIT也支持WebSocket,HTTP和REST远程服务,但就目前而言,让我们专注于INPROC服务。如果您了解INPROC,那么您将了解远程。
员工Hiringservice实际上向其他两项服务发射了事件。
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}请注意,我们致电Sendarray,以便我们派遣员工及其工资。 PayRoll_Adjustment_Channel的听众将必须同时处理代表新员工薪水的INT。您也可以使用Event Bus代理,因此您根本不必拨打活动总线。
福利服务聆听被雇用的新员工,因此可以将他们纳入福利系统。
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}爸爸需要获得报酬。
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}员工是员工对象的雇员对象。
这样您就可以获得自己的福利并付款!
在此处找到更多详细信息:
QBIT事件总线更详细的示例
您可以将自己的接口定义到活动总线,可以使用QBIT自己的事件总线。您服务中的每个模块都可以拥有自己的内部活动总线。
要了解更多阅读:QBIT微服务与私人事件总线和QBIT Java微服务LIB一起使用您自己的活动总线。
为了真正掌握Qbit,必须掌握回调的概念。
回调是在QBIT中获得异步响应的一种方式。
您调用服务方法,然后回电。
客户代理可以有回调:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}回调是Java 8消费者,具有一些可选的额外错误处理。
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}可以阻止的服务应使用回调。因此,如果在下面的示例中阻止了加载器,则应真正使用回调而不是返回值。
公共类建议服务{
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
}让我们假装loadUser必须在本地缓存中查看,如果找不到用户,请查看离子缓存,如果找不到的话,必须从UserService中询问用户,该用户必须检查其缓存,也许是后备来自数据库或其他服务的用户数据。换句话说, loadUser可能会阻止IO。
我们的客户不会阻止,但我们的服务确实可以。回到我们的RecommendationService 。如果我们为用户负载获得了很多缓存命中,也许块不会那么长,但是它会在那里,每次我们必须在用户中犯错时,整个系统都会被填充。我们想做的是,如果我们无法处理推荐请求,我们就可以继续对UserDataService进行异步。当该异步回调返回时,我们就会处理该请求。同时,我们尽快处理建议列表请求。我们永远不会阻止。
因此,让我们重新审视服务。我们要做的第一件事是让服务方法进行回调。在这样做之前,让我们制定一些规则。
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {现在,我们正在回电,我们可以决定何时要处理此推荐生成请求。如果我们需要的用户数据是内存的,或者我们可以延迟它,我们可以立即执行此操作。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}请注意,如果在缓存中找到了用户,我们将在内存中运行建议规则,并立即致电callback recommendationsCallback.accept(runRulesEngineAgainstUser(user)) 。
有趣的部分是如果没有加载用户,我们该怎么办。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...在这里,我们使用回调来加载用户,当用户加载时,我们调用handleLoadFromUserDataService ,该fromuserdataservice添加了有关处理回调的一些管理,以便我们仍然可以处理此调用,而不是现在。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...使用这样的lambdas使代码更可读性和简洁,但请记住,不要深深地嵌套lambda表达式,否则您将创建代码维护噩梦。明智地使用它们。
我们想要的是处理用户从其商店加载用户加载用户后的建议请求。
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}重要的是,每次我们从UserDataService收到回调电话时,我们都会执行CPU密集的建议规则并回电我们的呼叫者。好吧,我们要做的是在我们的回调队列中加入一个可运行的,后来我们会迭代这些何时?
建议服务的队列为空时,可以通知RecommendationService ,启动了新批次,并且已达到批处理限制。这些都是从UserDataService处理回调的好时机。
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}重要的是要记住在处理另一个服务的另一个微服务的回调时,在处理客户端的更多不断访问请求之前。本质上,您的客户一直在等待(异步等待但仍在等待),这些客户可能会像HTTP调用一样代表开放的TCP/IP连接,因此最好在处理更多请求之前将其关闭,就像我们说他们已经在等待周围有一个开放连接供用户加载形式的用户服务。
要了解有关回调的更多信息,PLESAE读取[QBIT Java Microservice LIB回调基础]([粗切] Qbit Microservice lib与回调一起工作)。
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...您可以组成碎片工人(用于内存,线程安全,CPU密集型服务)或IO工人或与外国服务或外国公共汽车交谈。
这是一个使用其中三名服务人员的工人池的示例:
假设您的服务可以做点什么:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}现在,这可以做些IO,您想拥有一定的运行库,而不仅仅是一个运行,以便您可以并行进行IO。经过一些性能测试,您发现三个是魔术数字。
您想使用API访问此服务:
public interface MultiWorkerClient {
void doSomeWork (...);
}现在,让我们创建这些银行并使用它。
首先创建添加线程/队列/微键的QBIT服务。
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();现在将它们添加到服务工作人员对象中。
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workers您可以将服务,POJOS和方法消费者,方法调度程序添加到服务捆绑包中。服务捆绑包是QBIT的集成点。
让我们添加我们的新服务人员。 ServiceWorkers是ServiceMethodDisPatcher。
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();我们可能会在服务捆绑包中添加辅助方法,因此大部分可以在单个呼叫中发生。
现在您可以开始使用工人。
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );现在,您可以使用Spring或Guice配置构建器和服务捆绑包。但是,您可以像上面的那样做到这一点,这对于测试和理解QBIT内部词非常有用。
QBIT还支持碎片服务的概念,该概念非常适合CPU等碎片资源(为用户推荐引擎运行每个CPU核心上的规则引擎)。
QBIT不知道如何分解您的服务,您必须提示它。您可以通过碎片规则进行此操作。
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}我们在一个应用程序上工作,在该应用程序中,对服务的第一个论点是用户名,然后我们将其用于将CPU密集型内存中的内存性规则引擎打电话。该技术有效。 :)
ServiceWorkers类有一种创建碎片工人池的方法。
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}要使用您的服务工作人员时,只需通过碎片键即可。
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});然后将您的服务添加到服务工作人员组成中。
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}然后像以前一样将其添加到服务捆绑包中。
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();然后只使用它:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}ShardonBeanPath使您可以创建一个复杂的Bean路径导航调用,并使用其属性进行碎片。
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );在此处阅读有关服务碎片和服务工作人员的更多信息
您可以在Wiki中找到更多。也遵循提交。我们一直很忙。 QBIT用于Java -JSON,REST,WEBSOCKECT的微服务lib。