Tutoriels QBit Java MicorServices Lib | Site Web QBIT | QBit utilise REAKT | Qbit fonctionne avec vert.x | REAKT VERTX
La lib microservice Java. QBIT est une LIB de programmation réactive pour la création de microservices - JSON, HTTP, WebSocket et Rest. QBIT utilise une programmation réactive pour créer des services Web élastiques et des services Web basés sur les points Web. SOA a évolué pour le mobile et le cloud. Servicevurvegory, santé, statistique réactive, événements, programmation réactive idiomatique Java pour les microservices.
Vous avez une question? Demandez ici: QBIT Google Group.
Tout est une file d'attente. Vous avez le choix. Vous pouvez l'embrasser et le contrôler. Vous pouvez optimiser pour cela. Ou vous pouvez vous cacher derrière des abstractions. QBit vous ouvre à un coup d'œil dans ce qui se passe et vous permet de tirer des leviers sans vendre votre âme.
QBIT est une bibliothèque et non un cadre. Vous pouvez mélanger et assortir QBIT avec le printemps, le Guice, etc.
QBIT prend désormais en charge les promesses invoquables de REAKT pour les proxys clients locaux et distants. Cela donne une belle API courante pour la programmation asynchrone.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();Les rappels QBIT sont désormais également des rappels REAKT sans rompre le contrat QBIT pour les rappels.
Voir les promesses invoquées de REAKT pour plus de détails.
QBIT est publié au Maven Public Repo.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'Déployé dans plusieurs grandes entreprises du Fortune 100. QBit fonctionne désormais avec Vertx (autonome ou intégré). Vous pouvez également utiliser QBIT sur des projets non QBIT, ce n'est qu'une lib.
Apache 2
QBit a des services InProc, des microservices de repos et des microservices WebSocket ainsi qu'un bus d'événements de services In-Proc (qui peut être par module ou par application). Il soutient les travailleurs et les services en mémoire.
Avant de décrire plus, voici deux exemples de services:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}À la fin de la journée, QBIT est une bibliothèque simple et non un cadre. Votre application n'est pas une application QBIT mais une application Java qui utilise la LIB QBIT. QBIT vous permet de travailler avec Java Util simultanément et ne s'efforce pas de vous le cacher. J'essaie juste d'en retirer la piqûre.
Nous avons utilisé des techniques dans BOON et QBIT avec un grand succès dans des applications haut de gamme, hautes performances et à grande échelle. Nous avons aidé les clients à gérer 10x la charge avec 1 / 10e les serveurs de leurs concurrents en utilisant des techniques dans QBIT. QBit est en train de nous avoir mal à la main et à l'accès à la file d'attente à la main.
Les idées pour Boon et Qbit proviennent souvent de partout sur le Web. Nous faisons des erreurs. Indiquez-les. En tant que développeur de Boon et Qbit, nous sommes collègues voyageurs. Si vous avez une idée ou une technique que vous souhaitez partager, nous écoutons.
Une grande inspiration pour Boon / Qbit a été Vertx, Akka, GO Cannels, des objets actifs, le filetage du modèle d'appartement, l'acteur et les articles de sympathie mécanique.
QBit a des idées similaires à de nombreux cadres. Nous lisons tous les mêmes articles. QBIT s'est inspiré des documents de perturbateur LMAX et de cet article de blog sur la file d'attente de transfert de liens contre le perturbateur. Nous avions des théories sur les files d'attente que le billet de blog nous a inspirés pour les essayer. Certaines de ces théories sont déployées dans certains des plus grands backends de middleware et dont les marques de nom sont connues dans le monde. Et ainsi Qbit est né.
QBIT a également pris beaucoup d'inspiration par l'excellent travail réalisé par Tim Fox sur Vertx. Le premier projet utilisant quelque chose qui pourrait réellement être appelé QBIT (quoique précoce QBIT) a été d'utiliser VERTX sur un microservice Web / mobile pour une application qui pourrait potentiellement avoir 80 millions d'utilisateurs. C'est cette expérience avec Vertx et le QBIT précoce qui a conduit au développement et à l'évolution du QBIT. QBIT est construit sur les épaules des géants (Netty / Vertx).
Disrupteur de printemps: Non. Spring Boot / Spring MVC: Non. Nous utilisons les mêmes annotations mais QBIT est conçu pour les microservices en mémoire à grande vitesse. Il ressemble plus à Akka qu'à Spring Boot. QBIT a un sous-ensemble des fonctionnalités de Spring MVC SEMPS uniquement pour les microservices, c'est-à-dire, WebSocket RPC, REST, JSON Marshaling, etc. Akka: Non. Eh bien peut-être. Akka a des concepts similaires mais ils adoptent une approche différente. QBIT est plus axé sur Java et les microservices (REST, JSON, WebSocket) qu'Akka. Perturbateur de LMAX: Non. En fait, nous pouvons utiliser le perturbateur comme sur les files d'attente que QBIT utilise sous les couvertures.
(Les premiers repères ont été supprimés. Ils étaient ici. Qbit est devenu beaucoup plus rapide. Benchmarking Qbit est une cible en mouvement pour le moment. Des liens et des rapports seront créés.)
Exemples de code
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)QBIT est une bibliothèque de file d'attente pour les microservices. Il est similaire à de nombreux autres projets comme Akka, Spring Reactor, etc. Qbit n'est qu'une bibliothèque et non une plate-forme. QBIT dispose de bibliothèques pour mettre un service derrière une file d'attente. Vous pouvez utiliser des files d'attente QBIT directement ou vous pouvez créer un service. Les services QBIT peuvent être exposés par WebSocket, HTTP, HTTP Pipeline et autres types de télécommande. Un service dans QBIT est une classe Java dont les méthodes sont exécutées derrière des files d'attente de service. QBIT implémente le filetage du modèle d'appartement et est similaire au modèle d'acteur ou une meilleure description serait des objets actifs. QBIT n'utilise pas de perturbateur (mais pourrait). Il utilise des files d'attente Java régulières. QBIT peut faire le nord de 100 millions d'appels de ping-pong par seconde, ce qui est une vitesse incroyable (voire jusqu'à 200 m). QBIT prend également en charge les services d'appel via REST et WebSocket. QBIT est des microservices au sens du Web pur: JSON, HTTP, WebSocket, etc. QBIT utilise un micro-lots pour pousser les messages à travers le tuyau (file d'attente, IO, etc.) plus rapidement pour réduire le transfert de threads.
QBIT est une lib Java Microservice prenant en charge REST, JSON et WebSocket. Il est écrit en Java mais nous pourrions un jour écrire une version en rouille ou en go ou C # (mais cela nécessiterait un grand salaire).
Service Pojo (Plain Old Java Object) derrière une file d'attente qui peut recevoir des appels de méthode via des appels ou des événements proxy (peut avoir un thread de gestion des événements, des appels de méthode et des réponses ou deux pour les appels et les événements de méthode et l'autre pour les réponses afin que les gestionnaires de réponse Ne bloquez pas le service. Les services peuvent utiliser des annotations de repos de style Spring MVC pour s'exposer au monde extérieur via REST et WebSocket.
ServiceBundle de nombreux pojos derrière une file d'attente de réponse et beaucoup reçoivent des files d'attente. Il peut y avoir un fil pour toutes les réponses ou non. Ils peuvent également être une file d'attente.
File d'attente un fil qui gérait une file d'attente. Il prend en charge le lot. Il a des événements pour vide, atteint la limite, démarré, inactif. Vous pouvez écouter ces événements à partir de services qui se trouvent derrière une file d'attente. Vous n'avez pas besoin d'utiliser des services. Vous pouvez utiliser la file d'attente directe. Dans QBIT, vous avez des files d'attente d'expéditeurs et des files d'attente de récepteurs. Ils sont séparés pour prendre en charge le micro-dossard.
ServiceDpointServer ServiceBundle exposé au repos et à la communication WebSocket.
EventBus EventBus est un moyen d'envoyer beaucoup de messages à des services qui peuvent être couplés de manière lâche.
ClientProxy ClientProxy est un moyen d'invoquer le service via l'interface asynchrone, le service peut être InProc (même processus) ou éloigné sur WebSocket.
Le QBIT non bloquant est une lib non bloquante. Vous utilisez des rappels via Java 8 Lambdas. Vous pouvez également envoyer des messages d'événement et obtenir des réponses. La messagerie est intégrée dans le système afin que vous puissiez facilement coordonner les tâches complexes. QBIT adopte une approche orientée objet pour le développement de services afin que les services ressemblent à des services Java normaux que vous écrivez déjà, mais les services vivent derrière une file d'attente / fil. Ce n'est pas un nouveau concept. Microsoft l'a fait avec DCOM / COM et a appelé ses objets actifs. Akka le fait avec les acteurs et les a appelés acteurs fortement tapés. Les concepts importants sont que vous obtenez la vitesse de la messagerie réactive et de style acteur, mais vous vous développez dans une approche POO naturelle. QBit n'est pas le premier. Qbit n'est pas le seul.
La vitesse QBIT est très rapide. Il y a bien sûr beaucoup de place à l'amélioration. Mais déjà 200m + TPS InProc Ping Pong, 10m-20m + tps événement Bus, 500k TPS RPC appelle WebSocket / JSON, etc. Plus de travail doit être fait pour améliorer la vitesse, mais maintenant c'est assez rapide où nous nous concentrons davantage sur la convivialité. Le support JSON utilise Boon par défaut, ce qui est jusqu'à 4x plus rapidement que les autres analyseurs JSON pour le cas d'utilisation REST / JSOND / JSON.
La programmation réactive QBIT fournit un réacteur pour gérer les appels asynchrones. Cela permet de gérer les rappels sur le même thread qui les a appelés et il prévoit le délai d'expiration et la gestion des erreurs. Lire le didacticiel du réacteur pour créer une programmation de micro-services réactive
Discovery de service Construit dans la prise en charge de la découverte de services. Cela comprend l'intégration avec le consul.
STATSERVICE PRÉSENTATION DANS LES STATES. Le statService peut être intégré à STATSD (Graphite, Grafana, Datadog, etc.) pour publier des statistiques passives. Ou vous pouvez interroger le moteur des statistiques et réagir aux statistiques (dénombrements, horaires et niveaux). Le STATSSERVICE est un système de statistiques réactif qui peut être cluster. Le statService est réactif en ce que vos services peuvent y publier et l'interroger et réagir en fonction des résultats. Vous pouvez mettre en œuvre des choses comme la limitation des taux et réagir à un taux accru de quelque chose. Le système Servicediscovery s'intègre au système de santé et au consul pour retrousser chacun de vos services internes qui composent votre micro-service et publient le composite disponible sur votre micro-service à un seul point de terminaison HTTP ou un commutateur de Mans Dead dans Consul (TTL).
Parler est bon marché. Regardons un code. Vous pouvez faire une promenade détaillée dans le wiki. Nous avons déjà beaucoup de documentation.
Nous créerons un service exposé via REST / JSON.
Pour interroger la taille de la liste TODO:
curl localhost:8080/services/todo-service/todo/countPour ajouter un nouvel élément TODO.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todoPour obtenir une liste d'articles TODO
curl http://localhost:8080/services/todo-service/todo/L'exemple TODO utilisera et suivra les éléments TODO.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;Le Todoservice utilise des annotations de style Spring MVC.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
} Vous pouvez publier / mettre des non-JSON et vous pouvez capturer le corps sous forme String ou comme un byte[] . Si le type de contenu est défini sur autre chose que application/json et que votre corps est défini une chaîne ou un octet []. Cela fonctionne automatiquement. (Le type de contenu doit être défini.)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
} Par défaut, QBIT envoie un 200 (ok) pour un appel non vide (un appel qui a un retour ou un rappel). Si l'opération de repos n'a pas de retour ou pas de rappel, QBIT envoie un 202 (accepté). Il peut y avoir des moments où vous souhaitez envoyer un 201 (créé) ou un autre code qui n'est pas une exception. Vous pouvez le faire en définissant code sur @RequestMapping . Par défaut, le code est -1, ce qui signifie utiliser le comportement par défaut (200 pour le succès, 202 pour le message unidirectionnel et 500 pour les erreurs).
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks peuvent également être utilisés pour les services internes. Il est souvent le cas que vous utilisez un CallbackBuilder ou un réacteur QBIT pour gérer les appels de service.
Vous n'avez pas à retourner des appels de repos JSON. Vous pouvez retourner n'importe quel texte binaire ou n'importe quel texte en utilisant HttpBinaryResponse et HttpTextResponse .
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}Pourquoi avons-nous choisi des annotations de style printemps?
Maintenant, commencez-le.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}C'est ça. Il y a également une prise en charge de la boîte WebSocket avec la génération de proxy côté client afin que vous puissiez appeler des services au taux de millions d'appels par seconde.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}Vous pouvez toujours invoquer des services QBIT via un proxy WebSocket. L'avantage d'un proxy WebSocket est qu'il vous permet d'exécuter 1M RPC + une seconde (1 million d'appels distants chaque seconde).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );La sortie est 3.
3
Ce qui précède utilise une interface proxy WebSocket pour appeler le service Async.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}Créez un client de service WebSocket qui est conscient de l'envoi.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" ); Actuellement, le clientBuilder chargera tous les points de terminaison de service enregistrés sous le nom du service et en choisissez au hasard un.
Servicediscovery comprend un consul basé sur le consul, regarder des fichiers JSON sur le disque et DNS. Il est également facile d'écrire votre propre découverte de services et de le brancher sur QBIT.
À l'avenir, nous pouvons les appels Roundrobin ou les appels à raccourci vers le service WebSocket et / ou fournir une échec automatique si la connexion est fermée. Nous le faisons pour le bus d'événements qui utilise la découverte de service, mais il n'est pas encore cuit dans les talons clients basés sur WebSocket.
Le dernier exemple client utilise WebSocket. Vous pouvez également utiliser le repos et utiliser réellement les paramètres d'URI que nous configurons. Le repos est agréable mais il va être plus lent que la prise en charge de WebSocket.
QBit est livré avec un joli petit client HTTP. Nous pouvons l'utiliser.
Vous pouvez l'utiliser pour envoyer des appels asynchrones et des messages WebSocket avec le client HTTP.
Ici, nous utiliserons le client HTTP pour invoquer notre méthode distante:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );La sortie est 4.
4
Vous pouvez également accéder au service depuis Curl.
$ curl http://localhost:7000/services/adder-service/add/2/2Voir cet exemple complet ici: QBIT Microservice Getting Startiorial.
Client QBIT URI Params et WebSocket
QBIT dispose d'une bibliothèque pour travailler avec et écrire des microservices asynchrones qui sont légers et amusants à utiliser.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Arrêtez maintenant le serveur et le client. Assez facile hein?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
L'accent est mis sur la facilité d'utilisation et l'utilisation de Java 8 Lambdas pour les rappels afin que le code soit serré et petit.
En savoir plus sur la prise en charge de la prise en charge du style microservice de QBIT ici
Maintenant, essayons notre client HTTP.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();Vous passez simplement l'URL, le port puis appelez.
Vous pouvez maintenant commencer à envoyer des demandes HTTP.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );Une réponse HTTP contient simplement les résultats du serveur.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}Il existe des méthodes d'assistance pour Sync HTTP Get Calls.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
La méthode PUTS est une méthode d'assistance qu'il fait plus ou moins System.out.println.
Les cinq premiers paramètres sont couverts. Au-delà de cinq, vous devez utiliser le httpbuilder.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );Il y a aussi des appels asynchronisés pour obtenir.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[En savoir plus sur le client HTTP Microservice facile à utiliser facile ici] (https://github.com/advantageous/qbit/wiki/%5BDOC%5D-Using-qbit-microservice-lib's-httpclient-get ,-post, -et-al, -json, -java-8-lambda).
QBIT permet également aux services derrière les files d'attente d'être exécutés dans PROC.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());Un tutoriel détaillé sur les services In-Proc est en cours d'écriture.
Exemple de bus d'événement QBIT plus détaillé
QBIT a également un bus d'événements de service. Cet exemple est un exemple de services aux avantages sociaux des employés.
Nous avons deux canaux.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Un objet employé ressemble à ceci:
public static class Employee {
final String firstName ;
final int employeeId ;Cet exemple a trois services: EmployeeHiringService, avantages sociaux et PayrollService.
Ces services sont des services InProc. QBIT prend également en charge WebSocket, HTTP et REST, mais pour l'instant, concentrons-nous sur les services InProc. Si vous comprenez InProc, vous comprendrez à distance.
L'EmployeeHiringService tire en fait les événements vers deux autres services.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}Notez que nous appelons SendarRay afin que nous puissions envoyer l'employé et leur salaire. L'auditeur de Payroll_Adjustment_Channel devra gérer à la fois un employé et un INT qui représente le salaire des nouveaux employés. Vous pouvez également utiliser des proxys de bus d'événements afin que vous n'ayez pas du tout à appeler dans le bus événementiel.
Le service avantageux écoute les nouveaux employés embauchés afin qu'il puisse les inscrire au système d'avantages sociaux.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}Papa doit être payé.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}L'employé est l'objet de l'employé de l'employeeHiringService.
Vous pouvez donc obtenir vos avantages et payer!
Trouvez plus de détails ici:
Exemple de bus d'événement QBIT plus détaillé
Vous pouvez définir votre propre interface avec le bus d'événements et vous pouvez utiliser vos propres bus d'événements avec QBIT. Chaque module de votre service peut avoir son propre bus d'événements internes.
Pour en savoir plus, lecture: QBIT Microservice travaillant avec un bus d'événements privé et un microservice JABA Microservice en utilisant votre propre interface avec le bus de l'événement.
Pour vraiment saisir QBIT, il faut saisir les concepts d'un rappel.
Un rappel est un moyen d'obtenir une réponse asynchrone dans QBIT.
Vous appelez une méthode de service et il vous rappelle.
Les proxys clients peuvent avoir des rappels:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}Les rappels sont des consommateurs Java 8 avec une gestion des erreurs supplémentaires en option.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}Les services qui peuvent bloquer doivent utiliser des rappels. Ainsi, si LoadUser bloquait dans l'exemple suivant, il devrait vraiment utiliser un rappel au lieu de renvoyer une valeur.
classe publique Recommandations pourvice {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
} Prétendons que loadUser doit regarder dans un cache local, et si l'utilisateur n'est pas trouvé, regardez dans un cache désactivé et si elle n'est pas trouvée, il doit demander à l'utilisateur de l'utilisateur qui doit vérifier ses caches et peut-être le calcul du chargement du chargement de la Données utilisateur d'une base de données ou d'autres services. En d'autres termes, loadUser peut potentiellement bloquer sur IO.
Notre client ne bloque pas, mais notre service le fait. Revenons à notre RecommendationService . Si nous obtenons beaucoup de coups de cache pour les charges des utilisateurs, le bloc ne sera peut-être pas si long, mais il sera là et chaque fois que nous devons reprocher chez un utilisateur, l'ensemble du système est gommé. Ce que nous voulons pouvoir faire, c'est que si nous ne pouvons pas gérer la demande de recommandation, nous allons de l'avant et faisons un appel asynchrone au UserDataService . Lorsque ce rappel asynchrone revient, nous traitons cette demande. En attendant, nous traitons les listes de recommandations les demandes le plus rapidement possible. Nous ne bloquons jamais.
Revoyons donc le service. La première chose que nous allons faire est de faire en sorte que la méthode de service prenne un rappel. Avant de le faire, fixons quelques règles.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {Maintenant, nous prenons un rappel et nous pouvons décider quand nous voulons gérer cette demande de génération de recommandations. Nous pouvons le faire immédiatement si les données utilisateur dont nous avons besoin sont en mémoire ou si nous pouvons les retarder.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
} Remarquez, si l'utilisateur est trouvé dans le cache, nous exécutons nos règles de recommandation en mémoire et appelons le rappel recommendationsCallback.accept(runRulesEngineAgainstUser(user)) .
La partie intéressante est que faisons-nous si nous n'avons pas chargé l'utilisateur.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
... Ici, nous utilisons un rappel pour charger l'utilisateur, et lorsque l'utilisateur est chargé, nous appelons handleLoadFromUserDataService qui ajoute une gestion de la gestion du rappel afin que nous puissions toujours gérer cet appel, tout simplement pas maintenant.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...L'utilisation de Lambdas comme celle-ci rend le code plus lisible et laconique, mais n'oubliez pas que ne nichez pas profondément les expressions de lambda ou vous créerez un cauchemar de maintenance de code. Utilisez-les judicieusement.
Ce que nous voulons, c'est de traiter la demande de recommandations après que le système de service utilisateur ait chargé l'utilisateur de son magasin.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
} La partie importante est que chaque fois que nous recevons un appel de rappel de UserDataService , nous effectuons ensuite nos règles de recommandation intensive de CPU et appelons notre appelant. Eh bien, pas exactement, ce que nous faisons, c'est que la mise en place de la file d'attente de nos rappels, et plus tard nous allons les itérer à travers ceux-ci mais quand?
Le RecommendationService peut être informé lorsque sa file d'attente est vide, elle a commencé un nouveau lot et lorsqu'il a atteint une limite de lots. Ce sont tous de bons moments pour gérer les rappels à partir de UserDataService .
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}Il est important de se rappeler lors de la gestion des rappels à partir d'un autre microservice que vous souhaitez gérer les rappels de l'autre service avant de gérer plus de demandes de votre clientèle. Essentiellement, vous avez des clients qui attendaient (Async Waiting mais quand même), et ces clients pourraient représenter une connexion TCP / IP ouverte comme un appel HTTP, il est donc préférable de les fermer avant de gérer plus de demandes et comme nous l'avons dit, ils attendaient déjà déjà Autant avec une connexion ouverte pour que les utilisateurs puissent former le service utilisateur.
Pour en savoir plus sur les rappels, Plesae Read [QBit Java Microservice Lib Rappel Fundamentals] ([Cut rugueuse] QBIT Microservice Lib travaillant avec les rappels).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...Vous pouvez composer les travailleurs fragmentés (pour la mémoire, le coffre-fort, les services à forte intensité de processeur) ou les travailleurs pour IO ou parler à des services étrangers ou à des bus étrangers.
Voici un exemple qui utilise un pool de travailleurs avec trois travailleurs de service:
Disons que vous avez un service qui fait quelque chose:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}Maintenant, cela fait une sorte d'IO et vous voulez avoir une banque de ces courses non seulement une, vous pouvez donc faire IO en parallèle. Après quelques tests de performances, vous avez découvert que trois est le nombre magique.
Vous souhaitez utiliser votre API pour accéder à ce service:
public interface MultiWorkerClient {
void doSomeWork (...);
}Créons maintenant une banque de ceux-ci et utilisons-le.
Créez d'abord les services QBIT qui ajoutent le thread / file d'attente / microbatch.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();Ajoutez-les maintenant à un objet travailleur de service.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workersVous pouvez ajouter des services, des pojos et des consommateurs de méthodes, des répartiteurs de méthodes à un bundle de services. Le bundle de services est un point d'intégration dans QBIT.
Ajoutons nos nouveaux travailleurs de service. Les travailleurs de service sont un ServiceMethodDispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Nous allons probablement ajouter une méthode d'assistance au bundle de services afin que la plupart de cela puisse se produire en un seul appel.
Vous pouvez maintenant commencer à utiliser vos travailleurs.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );Vous pouvez maintenant utiliser Spring ou Guice pour configurer les constructeurs et le bundle de services. Mais vous pouvez simplement le faire comme ce qui précède, ce qui est bon pour tester et comprendre les internes QBIT.
QBIT prend également en charge le concept de services Shardée, ce qui est bon pour les ressources de fragment comme le CPU (exécutez un moteur de règles sur chaque noyau de CPU pour un moteur de recommandation d'utilisateurs).
QBIT ne sait pas comment faire fléchir vos services, vous devez lui donner un indice. Vous le faites grâce à une règle de fragment.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}Nous avons travaillé sur une application où le premier argument aux services était le nom d'utilisateur, puis nous l'avons utilisé pour faire reculer les appels à un moteur de règles en mémoire intensif en processeur. Cette technique fonctionne. :)
La classe des travailleurs de service propose une méthode pour créer un pool de travailleurs frappé.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}Pour vous utiliser, passez simplement une clé de fragment lorsque vous créez les travailleurs du service.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});Ensuite, ajoutez vos services à la composition des travailleurs de service.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}Ajoutez-le ensuite au bundle de services comme précédemment.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Ensuite, utilisez-le:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}Le ShardonBeanPath vous permet de créer un appel de navigation de chemin de haricot complexe et d'utiliser sa propriété pour s'affronter.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );En savoir plus sur les travailleurs de la rupture des services et des services ici
Vous pouvez en trouver beaucoup plus dans le wiki. Suivez également les commits. Nous avons été des castors occupés. QBit La lib micro-service pour Java - JSON, REST, WebSocket.