Qbit Java Micorservices lib Tutorials | Qbit -Website | Qbit verwendet REAKT | Qbit funktioniert mit Vert.x | Wiederholungsvertx
Die Java Microservice Lib. Qbit ist eine reaktive Programmierung für das Erstellen von Microservices - JSON, HTTP, WebSocket und Ruhe. Qbit verwendet reaktive Programme, um elastische Ruhe- und Websockets -basierte Cloud -freundliche Webdienste zu erstellen. SOA hat sich für Mobile und Cloud entwickelt. Servicediscovery, Gesundheit, reaktiver Statistik, Ereignisse, Java -idiomatische reaktive Programmierung für Microservices.
Hast du eine Frage? Fragen Sie hier: Qbit Google Group.
Alles ist eine Warteschlange. Sie haben eine Wahl. Sie können es umarmen und kontrollieren. Sie können dafür optimieren. Oder Sie können sich hinter Abstraktionen verstecken. Qbit öffnet Sie dem Blick in das, was vor sich geht, und ermöglicht es Ihnen, einige Hebel zu ziehen, ohne Ihre Seele zu verkaufen.
Qbit ist eine Bibliothek kein Framework. Sie können Qbit mit Frühling, Guice usw. mischen und anpassen.
Qbit unterstützt jetzt die rauakt invokablen Versprechen für lokale und Remote -Client -Proxies. Dies gibt eine schöne, fließende API für die asynchronisierte Programmierung.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();Qbit -Rückrufe sind jetzt auch REAKT -Rückrufe ohne den QBIT -Vertrag für Rückrufe.
Weitere Informationen finden Sie unter REAKT Invokable Versprechen.
Qbit wird im Maven Public Repo veröffentlicht.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'Eingesetzt in mehreren großen Fortune -100 -Unternehmen. Qbit arbeitet jetzt mit Vertx (eigenständig oder eingebettet). Sie können QBIT auch für Nicht-Qbit-Projekte verwenden, es ist nur eine LIB.
Apache 2
Qbit verfügt über InProc-Dienste, REST-Microservices und Websocket-Microservices sowie einen In-Proc-Service-Event-Bus (der pro Modul oder pro App sein kann). Es unterstützt Arbeitnehmer und Memory-Dienste.
Bevor wir mehr beschreiben, finden Sie hier zwei Beispieldienste:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}Letztendlich ist QBIT eine einfache Bibliothek kein Framework. Ihre App ist keine Qbit -App, sondern eine Java -App, die die QBit Lib verwendet. Mit QBIT können Sie mit Java -Util -gleichzeitiger Arbeit arbeiten und sich nicht vorbereitet, es vor Ihnen zu verbergen. Ich versuche nur, den Stich herauszunehmen.
Wir haben Techniken in Boon und Qbit mit großem Erfolg in hochwertigen, leistungsstarken Apps verwendet. Wir haben Kunden geholfen, 10x mit 1/10 die Server ihrer Konkurrenten mit Techniken in Qbit umzugehen. Qbit ist es satt, dass wir von Hand -Tuning -Warteschlangenzugriff und Fäden satt sind.
Ideen für Segen und Qbit kommen oft aus dem gesamten Web. Wir machen Fehler. Weisen Sie sie darauf hin. Als Entwickler von Boon und Qbit sind wir Mitreisende. Wenn Sie eine Idee oder Technik haben, die Sie teilen möchten, hören wir zu.
Eine große Inspiration für Boon/Qbit war Vertx, Akka, Go -Kanäle, aktive Objekte, Apartmentmodell -Threading, Schauspieler und die mechanischen Sympathiepapiere.
Qbit hat Ideen, die vielen Frameworks ähnlich sind. Wir alle lesen dieselben Papiere. Qbit wurde von den LMAX Disruptor Papers und in diesem Blog -Beitrag über Link Transfer -Warteschlange und Disruptor inspiriert. Wir hatten einige Theorien über Warteschlangen, die uns dazu inspiriert haben, sie auszuprobieren. Einige dieser Theorien werden in einigen der größten Middleware -Backends eingesetzt, deren Namensmarken weltweit bekannt sind. Und so wurde Qbit geboren.
Qbit ließ sich auch sehr von der großartigen Arbeit von Tim Fox auf Vertx inspirieren. Das erste Projekt, das etwas verwendet, das tatsächlich als QBIT (wenn auch früh qbit) bezeichnet werden könnte, bestand darin, Vertx in einem Web/Mobile -Microservice für eine App zu verwenden, die möglicherweise 80 Millionen Benutzer haben könnte. Es war diese Erfahrung mit Vertx und dem frühen Qbit, das zu QBIT -Entwicklung und -entwicklung führte. Qbit basiert auf den Schultern von Riesen (Netty/Vertx).
Spring Disruptor: Nein. Sie können QBIT verwenden, um Plugins für den Frühlingsstörer zu schreiben, nehme an, aber Qbit konkurriert nicht mit Spring Disruptor. Spring Boot/Spring MVC: Nein. Wir verwenden dieselben Anmerkungen, aber Qbit ist auf Hochgeschwindigkeits-Memory-Microservices ausgerichtet. Es ist eher wie Akka als wie Spring Boot. Qbit verfügt über eine Untergruppe der Funktionen von Spring MVC, die nur für Microservices, dh, Websocket RPC, REST, JSON Marshaling usw. ausgerichtet ist. Akka: Nein. Akka hat ähnliche Konzepte, aber sie verfolgen einen anderen Ansatz. Qbit konzentriert sich mehr auf Java und Microservices (Rest, JSON, Websocket) als auf Akka. LMAX Disruptor: Nein. Tatsächlich können wir Disruptor wie die Warteschlangen verwenden, die Qbit unter den Abdeckungen verwendet.
(Frühe Benchmarks wurden entfernt. Sie waren hier. Qbit wurde viel schneller. Das Benchmarking Qbit ist momentan ein bewegendes Ziel. Links und Berichte werden erstellt.)
Codebeispiele
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)Qbit ist eine Warteschlangenbibliothek für Microservices. Es ähnelt vielen anderen Projekten wie Akka, Frühlingsreaktor usw. Qbit ist nur eine Bibliothek, keine Plattform. Qbit hat Bibliotheken, um einen Dienst hinter eine Warteschlange zu stellen. Sie können QBit -Warteschlangen direkt verwenden oder einen Dienst erstellen. Qbit -Dienste können von WebSocket, HTTP, HTTP -Pipeline und anderen Remotten entlarvt werden. Ein Dienst in QBIT ist eine Java -Klasse, deren Methoden hinter Service -Warteschlangen ausgeführt werden. Qbit implementiert Apartmentmodell -Threading und ähnelt dem Schauspielermodell, oder eine bessere Beschreibung wäre aktive Objekte. Qbit verwendet keinen Disruptor (könnte). Es verwendet reguläre Java -Warteschlangen. Qbit kann nördlich von 100 Millionen Ping -Pong -Anrufen pro Sekunde durchführen, was eine erstaunliche Geschwindigkeit ist (bis zu 200 m). Qbit unterstützt auch die Anrufdienste über REL und WebSocket. Qbit ist Microservices im reinen Web-Sinn: JSON, HTTP, WebSocket usw. Qbit verwendet Micro-Batching, um Nachrichten durch die Pipe (Warteschlange, IO usw.) schneller zu schieben, um die Übergabe von Fäden zu reduzieren.
Qbit ist ein Java Microservice Lib, der Ruhe, JSON und Websocket unterstützt. Es ist in Java geschrieben, aber wir könnten eines Tages eine Version in Rust oder Go oder C# schreiben (aber das würde einen großen Zahltag erfordern).
Service Pojo (einfaches altes Java -Objekt) hinter einer Warteschlange, mit der Methodenaufrufe über Proxy -Anrufe oder -ereignisse empfangen werden können (können einen Thread -Verwalten von Ereignissen, Methodenaufrufen und Antworten für Methodenaufrufe und Ereignisse und die andere für Antworten So Antworthandlern so Antwort -Handlungen haben Blockieren Sie den Service nicht. Services können REST -Anmerkungen im Spring MVC -Stil nutzen, um sich über Ruhe und Websocket der Außenwelt auszusetzen.
ServiceBundle viele Pojos hinter einer Antwortwarteschlange und viele erhalten Warteschlangen. Es kann einen Thread für alle Antworten geben oder nicht. Sie können auch eine empfangene Warteschlange sein.
Warteschlange einen Thread, der eine Warteschlange verwaltet. Es unterstützt die Charge. Es hat Ereignisse für leer, erreichbar, gestartet, im Leerlauf. Sie können diese Ereignisse aus Diensten anhören, die hinter einer Warteschlange stehen. Sie müssen keine Dienste nutzen. Sie können Queue's Direct verwenden. In Qbit haben Sie Absenderwarteschlangen und Empfänger -Warteschlangen. Sie sind getrennt, um die Mikrobedeckung zu unterstützen.
ServiceDpointServer ServiceBundle, das der Kommunikation von Ruhe und Websocket ausgesetzt ist.
EventBus EventBus ist eine Möglichkeit, viele Nachrichten an Dienste zu senden, die möglicherweise locker gekoppelt sein können.
ClientProxy ClientProxy ist eine Möglichkeit, Dienst über eine asynchronisierende Schnittstelle aufzurufen. Der Dienst kann inProc (selben Prozess) oder über WebSocket gestaltet werden.
Nicht blockierende Qbit ist eine nicht blockierende LIB. Sie verwenden Rückrufe über Java 8 Lambdas. Sie können auch Ereignisnachrichten senden und Antworten abrufen. Messaging ist in das System eingebaut, sodass Sie komplexe Aufgaben leicht koordinieren können. Qbit verfolgt einen objektorientierten Ansatz für die Serviceentwicklung, sodass Dienste wie normale Java-Dienste aussehen, die Sie bereits schreiben, aber die Dienste leben hinter einer Warteschlange/Thread. Dies ist kein neues Konzept. Microsoft hat dies mit DCOM/COM gemacht und es aktive Objekte genannt. Akka macht es mit Schauspielern und nannte sie stark getippte Schauspieler. Die wichtigen Konzepte sind, dass Sie die Geschwindigkeit von Messaging von reaktiven und Schauspielern erhalten, sich jedoch in einem natürlichen OOP -Ansatz entwickeln. Qbit ist nicht der erste. Qbit ist nicht der einzige.
Speed Qbit ist sehr schnell. Es gibt natürlich viel Raum für Verbesserungen. Aber bereits 200 m+ TPS In-Proc-Ping-Pong, 10m-20m+ TPS-Ereignisbus, 500k TPS RPC-Anrufe über WebSocket/JSON usw. müssen mehr Arbeit geleistet werden, um die Geschwindigkeit zu verbessern. Jetzt ist es jedoch schnell genug, wo wir uns mehr auf die Benutzerfreundlichkeit konzentrieren. Der JSON -Support verwendet Boon standardmäßig, was bis zu 4x schneller ist als andere JSON -Parsers für den Anwendungsfall Rest/JSON, WebSocket/JSON.
Reaktive Programmierung QBIT bietet einen Reaktor zum Verwalten von asynchronen Aufrufen. Auf diese Weise können Rückrufe auf demselben Thread behandelt werden, der sie aufgerufen hat, und es bietet eine Zeitüberschreitung und Fehlerbehandlung. Lesen Sie das Reaktor -Tutorial zur Erstellung von reaktiven Mikro -Service -Programme
Service Discovery bildete Unterstützung für die Service -Erkennung. Dies beinhaltet die Integration mit Konsul.
Statistik -Statistik für Statistiken. Der StatService kann in Statsd (Graphit, Grafana, Datadog usw.) integriert werden, um passive Statistiken zu veröffentlichen. Oder Sie können den Statistikmotor abfragen und auf die Statistiken (Zählungen, Zeitpunkte und Ebenen) reagieren. Der StatisticSservice ist ein reaktives Statistiksystem, das gruppiert werden kann. Der StatService ist insofern reaktiv, als Ihre Dienste es veröffentlichen und anhand der Ergebnisse reagieren können. Sie können Dinge wie die Ratenbeschränkung implementieren und auf eine erhöhte Rate von etwas reagieren. Das servicediscovery -System integriert sich in das HealthSystem und den Konsul, um jeden Ihrer internen Dienste zu rollen, aus denen Sie den Mikroservice ausmachen, und das Verbundwerkstoff Ihres Micro -Dienstes für einen einzelnen HTTP -Endpunkt oder einen Dead Mans -Switch in Consul (TTL) zu veröffentlichen.
Reden ist billig. Schauen wir uns einen Code an. Sie können im Wiki einen detaillierten Spaziergang durchführen. Wir haben bereits eine Menge Dokumentation.
Wir werden einen Dienst erstellen, der über REST/JSON freigelegt wird.
Um die Größe der Todo -Liste abzufragen:
curl localhost:8080/services/todo-service/todo/countSo fügen Sie einen neuen Todo -Artikel hinzu.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todoUm eine Liste von Todoartikeln zu erhalten
curl http://localhost:8080/services/todo-service/todo/Das Todo -Beispiel verwendet und verfolgt Todo -Elemente.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;Der TodoService verwendet Anmerkungen im Spring MVC -Stil.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
} Sie können Nicht-Json posten/setzen und den Körper als String oder als byte[] erfassen. Wenn der Inhaltstyp auf alles andere als application/json eingestellt ist und Ihr Körper eine Zeichenfolge oder Byte [] definiert ist. Dies funktioniert automatisch. (Der Inhalt muss festgelegt werden.)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
} Standardmäßig sendet Qbit einen 200 (OK) für einen nicht-luiden-Anruf (ein Anruf mit einer Rückgabe oder einem Rückruf). Wenn der Restbetrieb keine Rückgabe oder keinen Rückruf hat, sendet Qbit eine 202 (akzeptiert). Es kann Zeiten geben, in denen Sie einen 201 (erstellten) oder einen anderen Code senden möchten, der keine Ausnahme darstellt. Sie können dies tun, indem Sie code auf @RequestMapping festlegen. Standardmäßig ist der Code -1, das bedeutet, dass das Standardverhalten (200 für den Erfolg, 202 für Einweg -Nachricht und 500 für Fehler) verwendet wird.
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks können auch für interne Dienste verwendet werden. Es ist häufig der Fall, dass Sie einen Callbackbuilder oder einen QBit -Reaktor verwenden, um Serviceanrufe zu verwalten.
Sie müssen keine JSON -Formrückrufe zurückgeben. Sie können einen Binär- oder einen beliebigen Text mithilfe von HttpBinaryResponse und HttpTextResponse zurückgeben.
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}Warum haben wir Anmerkungen im Frühjahrsstil ausgewählt?
Jetzt fang einfach an.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}Das ist es. Es gibt auch den Box WebSocket -Support bei der CLUT -Side -Proxy -Generation, sodass Sie die Dienste mit Millionen von Anrufen pro Sekunde in den Rufen bringen können.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}Sie können QBIT -Dienste jederzeit über einen WebSocket -Proxy aufrufen. Der Vorteil eines WebSocket -Proxy besteht darin, dass Sie 1m RPC+ pro Sekunde ausführen (1 Million Remote -Anrufe pro Sekunde).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );Die Ausgabe ist 3.
3
Die obige Verwendung verwendet eine WebSocket -Proxy -Schnittstelle, um den Service Async aufzurufen.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}Erstellen Sie den WebSocket -Service -Client, der sich bewusst ist.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" ); Derzeit lädt der clientBuilder alle Serviceendpunkte, die unter dem Dienstnamen registriert sind, und wählen zufällig eine aus.
Servicediscovery umfasst Konsulbasiert, das Ansehen von JSON -Dateien auf der Festplatte und DNS. Es ist einfach, auch Ihre eigene Service -Erkennung zu schreiben und sie an Qbit zu schließen.
In Zukunft können wir Roundrobin -Anrufe oder Shard -Anrufe zum WebSocket -Service und/oder automatisch angeben, wenn die Verbindung geschlossen ist. Wir tun dies für den Eventbus, der Service Discovery verwendet, aber es wird noch nicht in WebSocket -basierte Client -Stubs gebacken.
Das letzte Client -Beispiel verwendet WebSocket. Sie können auch REST verwenden und tatsächlich die von uns eingerichteten URI -Parameter verwenden. Ruhe ist schön, aber es wird langsamer als WebSocket -Support.
Qbit wird mit einem netten kleinen HTTP -Kunden geliefert. Wir können es benutzen.
Sie können es verwenden, um asynchronisierte Anrufe und Websocket -Nachrichten mit dem HTTP -Client zu senden.
Hier werden wir den HTTP -Client verwenden, um unsere Remote -Methode aufzurufen:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );Die Ausgabe ist 4.
4
Sie können auch auf den Service von Curl zugreifen.
$ curl http://localhost:7000/services/adder-service/add/2/2In diesem vollständigen Beispiel finden Sie hier: Qbit Microservice Erste Schritte Tutorial.
Qbit URI -Params und Websocket -Proxy -Client
Qbit verfügt über eine Bibliothek für die Arbeit und das Schreiben von asynchronen Microservices, die leicht und unterhaltsam sind.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Halten Sie nun den Server und den Client an. Ziemlich einfach wie?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
Der Fokus liegt auf der Benutzerfreundlichkeit und der Verwendung von Java 8 Lambdas für Rückrufe, damit der Code eng und klein ist.
Erfahren Sie hier mehr über den MicroService Style -Support von QBIT.
Probieren wir jetzt unseren HTTP -Client aus.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();Sie übergeben einfach die URL, den Port und rufen dann Start an.
Jetzt können Sie mit dem Senden von HTTP -Anfragen beginnen.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );Eine HTTP -Antwort enthält nur die Ergebnisse des Servers.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}Es gibt Helfermethoden für Sync HTTP -Anrufe.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
Die Puts -Methode ist eine Helfermethode, die es übrigens mehr oder weniger macht.
Die ersten fünf Parameter werden abgedeckt. Über fünf hinaus müssen Sie den HTTPBuilder verwenden.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );Es gibt auch asynchronisierte Aufrufe für Get.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[Weitere Informationen zu dem einfachen, schnellen Microservice-HTTP-Client finden Sie hier] (https://github.com/advantageous/qbit/wiki/%5bdoc%5d-using-qbit-microservice-lib'S-httpclient-get,-post ,-Post, -et-al, -json, -java-8-lambda).
Qbit ermöglicht es auch, dass Dienste hinter Warteschlangen auch in der Ausgabe ausgeführt werden.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());Ein detailliertes Tutorial zu In-Proc-Diensten wird geschrieben.
Qbit Event Bus detaillierteres Beispiel
Qbit hat auch einen Service -Event -Bus. Dieses Beispiel ist ein Beispiel für Mitarbeiterleistungen.
Wir haben zwei Kanäle.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Ein Mitarbeiterobjekt sieht so aus:
public static class Employee {
final String firstName ;
final int employeeId ;Dieses Beispiel hat drei Dienste: Mitarbeiterservice, Benefitsservice und Gehaltsabrechnung.
Diese Dienste sind INPROC -Dienste. QBIT unterstützt auch WebSocket-, HTTP- und REST -Remotedienste, aber im Moment konzentrieren wir uns auf INPROC -Dienste. Wenn Sie InProc verstehen, werden Sie Remote verstehen.
Der Mitarbeiterservice wird tatsächlich die Ereignisse auf zwei andere Dienste abgefeuert.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}Beachten Sie, dass wir SendArray anrufen, damit wir den Mitarbeiter und sein Gehalt senden können. Der Zuhörer für Payroll_Adjustment_Channel muss sowohl einen Mitarbeiter als auch einen INT verarbeiten, das das neue Gehalt der Mitarbeiter darstellt. Sie können auch Event -Bus -Proxies verwenden, damit Sie überhaupt nicht in den Event -Bus anrufen müssen.
Der Benefitsservice hört auf neue Mitarbeiter, die eingestellt werden, damit sie in das Leistungssystem einschreiben können.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}Papa muss bezahlt werden.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}Der Mitarbeiter ist das Arbeitnehmerobjekt aus dem Mitarbeiterservice.
So können Sie Ihre Leistungen erhalten und bezahlt!
Weitere Details finden Sie hier:
Qbit Event Bus detaillierteres Beispiel
Sie können Ihre eigene Schnittstelle zum Event -Bus definieren und Ihre eigenen Veranstaltungsbusse mit QBIT verwenden. Jedes Modul in Ihrem Service kann einen eigenen internen Eventbus haben.
Weitere Informationen zu erfahren: Qbit Microservice arbeitet mit einem privaten Eventbus und Qbit Java Microservice Lib mit einer eigenen Schnittstelle zum Event -Bus.
Um QBIT wirklich zu erfassen, muss man die Konzepte eines Rückrufs erfassen.
Ein Rückruf ist ein Weg, um eine asynchronisierte Antwort in QBIT zu erhalten.
Sie rufen eine Servicemethode an und ruft Sie zurück.
Client -Proxys können Rückrufe haben:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}Rückrufe sind Java 8 -Verbraucher mit einer optionalen zusätzlichen Fehlerbehandlung.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}Dienste, die blockieren können, sollten Rückrufe verwenden. Wenn LoadUser im folgenden Beispiel blockiert wird, sollte es wirklich einen Rückruf verwenden, anstatt einen Wert zurückzugeben.
öffentliche Klassenempfehlservice {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
} Stellen loadUser uns so aus, Benutzerdaten aus einer Datenbank oder aus anderen Diensten. Mit anderen Worten, loadUser kann möglicherweise IO blockieren.
Unser Kunde blockiert nicht, aber unser Service. Zurück zu unserem RecommendationService . Wenn wir viele Cache -Hits für Benutzerladungen erhalten, ist der Block möglicherweise nicht so lang, aber er wird da sein und jedes Mal, wenn wir einen Benutzer vorlegen müssen, ist das gesamte System zugenommen. Wir möchten in der Lage sein, wenn wir die UserDataService nicht bewältigen können. Wenn dieser asynchronisierte Rückruf zurückkommt, behandeln wir diese Anfrage. In der Zwischenzeit bearbeiten wir Empfehlungslisten so schnell wie möglich. Wir blockieren nie.
Lassen Sie uns also den Dienst überdenken. Das erste, was wir tun werden, ist, die Servicemethode einen Rückruf zu machen. Bevor wir das tun, lassen Sie uns einige Regeln festlegen.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {Jetzt nehmen wir einen Rückruf ab und können entscheiden, wann wir diese Empfehlungsgenerierungsanfrage bearbeiten möchten. Wir können es sofort tun, wenn Benutzerdaten, die wir benötigen, in Memory sind, oder wir können sie verzögern.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
} Wenn der Benutzer im Cache gefunden wird, führen wir unsere Empfehlungsregeln in Memory aus und rufen den Callback sofort recommendationsCallback.accept(runRulesEngineAgainstUser(user)) auf.
Der interessante Teil ist, was wir tun, wenn der Benutzer nicht geladen ist.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
... Hier handleLoadFromUserDataService wir einen Rückruf zum Laden des Benutzer.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...Wenn Sie Lambdas wie diesen verwenden, ist der Code lesbarer und knackiger, aber denken Sie daran, dass Sie Lambda -Ausdrücke nicht zutiefst nisten, oder Sie erstellen einen Albtraum für die Code -Wartung. Benutze sie mit Bedacht.
Wir wollen die Empfehlungsanforderung, nachdem das Benutzerdienstsystem den Benutzer aus dem Geschäft geladen hat.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
} Der wichtige Teil dafür ist, dass wir jedes Mal, wenn wir einen Rückruf von UserDataService erhalten, unsere CPU -Intensivempfehlungsregeln und unsere Anrufer unseres Anrufers durchführen. Nun, nicht genau, was wir tun, ist, einen Runnable in unsere Callbacks -Warteschlange zu setzen, und später werden wir durch diese durchlaufen, aber wann?
Der RecommendationService kann benachrichtigt werden, wenn seine Warteschlange leer ist, eine neue Charge gestartet wurde und wenn sie eine Chargengrenze erreicht hat. Dies sind alles gute Zeiten, um Rückrufe aus dem UserDataService zu verarbeiten.
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}Es ist wichtig, sich zu erinnern, wenn Sie Rückrufe von einem anderen Microservice abwickeln, den Sie aus dem anderen Service abwickeln möchten, bevor Sie weitere nicht auftretende Anfragen von Ihren Kunden bearbeiten. Im Wesentlichen haben Sie Kunden, die gewartet haben (asynchronisiert, aber immer noch), und diese Kunden könnten eine offene TCP/IP -Verbindung wie ein HTTP Mit einer offenen Verbindung, mit der Benutzer den Benutzerdienst laden können.
Um mehr über Callbacks zu erfahren, las PLESAE [Qbit Java Microservice Lib Callback Callback -Grundlagen] ([Rough Cut] Qbit Microservice Lib, die mit Rückrufen arbeitet).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...Sie können Sharded-Arbeiter (für In-Memory, Thread Safe, CPU-Intensivdienste) oder Arbeitnehmer für IO oder ein Gespräch mit ausländischen Diensten oder ausländischen Bussen komponieren.
Hier ist ein Beispiel, bei dem ein Arbeiterpool mit drei Servicemitern darin verwendet wird:
Nehmen wir an, Sie haben einen Service, der etwas tut:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}Dies macht eine Art IO und Sie möchten eine Bank von diesen haben, die nicht nur eine läuft, damit Sie parallel tun können. Nach einigen Leistungstests haben Sie herausgefunden, dass drei die magische Zahl sind.
Sie möchten Ihre API zum Zugriff auf diesen Dienst verwenden:
public interface MultiWorkerClient {
void doSomeWork (...);
}Lassen Sie uns nun eine Bank von diesen erstellen und sie verwenden.
Erstellen Sie zuerst die QBit -Dienste, die den Thread/die Warteschlange/die Mikrobatch hinzufügen.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();Fügen Sie sie nun zu einem Servicearbeiter -Objekt hinzu.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workersSie können Dienste, Pojos und Methodenverbraucher, Methoden -Dispatcher zu einem Servicebündel hinzufügen. Das Service -Bundle ist ein Integrationspunkt in QBIT.
Fügen wir unsere neuen Servicearbeiter hinzu. Serviceworker ist ein Servicemethoddispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Wir werden dem Service -Bundle wahrscheinlich eine Helfermethode hinzufügen, sodass das meiste davon in einem einzigen Anruf passieren kann.
Jetzt können Sie Ihre Mitarbeiter verwenden.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );Jetzt können Sie Spring oder Guice verwenden, um die Bauherren und das Servicebündel zu konfigurieren. Aber Sie können es einfach so tun wie das oben genannte, was gut zum Testen und Verständnis von Qbit -Interna ist.
Qbit unterstützt auch das Konzept von Sharded Services, das für Sharding -Ressourcen wie CPU gut geeignet ist (führen Sie eine Regeln -Engine für jeden CPU -Kern für eine Benutzerempfehlungs -Engine aus).
Qbit weiß nicht, wie Sie Ihre Dienste schärfen, Sie müssen ihm einen Hinweis geben. Sie tun dies durch eine Shard -Regel.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}Wir haben an einer App gearbeitet, in der das erste Argument für die Dienste der Benutzername war, und dann benutzten wir das, um Anrufe zu einer CPU-Intensiv-In-Memory-Regeln-Engine zu schärfen. Diese Technik funktioniert. :)
Die Klasse für Servicearbeiter verfügt über eine Methode zum Erstellen eines Sharded Worker -Pools.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}Um zu verwenden, übergeben Sie einfach einen Shard -Schlüssel, wenn Sie die Servicearbeiter erstellen.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});Fügen Sie dann Ihre Dienste zur Komposition der Servicearbeiter hinzu.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}Fügen Sie es dann wie zuvor zum Servicepaket hinzu.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Dann benutze es einfach:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}Mit dem ShardonbeanPath können Sie einen komplexen Bean -Pfad -Navigationsanruf erstellen und seine Eigenschaft zum Shard verwenden.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );Lesen Sie hier mehr über Service Sharding- und Service -Mitarbeiter
Sie können im Wiki noch viel mehr finden. Folgen Sie auch den Commits. Wir waren beschäftigt Biber. Qbit the Microservice Lib für Java - JSON, Ruhe, Websocket.