QBIT Java MicorServices Lib Tutoriais | Site qbit | Qbit usa REAKT | QBIT funciona com vert.x | REAKT Vertx
O Java Microservice Lib. O QBIT é um LIB de programação reativa para a construção de microsserviços - JSON, HTTP, WebSocket e REST. O QBIT usa programação reativa para construir o REST ELASTIC e WebSockets, com base em nuvem, amigáveis para a Web. A SOA evoluiu para celular e nuvem. Descoberta de Serviço, Saúde, Serviço Reativo, Eventos, Programação Reativa Idiomática de Java para Microsserviços.
Tem uma pergunta? Pergunte aqui: QBIT GRUPO GOOGLE.
Tudo é uma fila. Você tem uma escolha. Você pode abraçá -lo e controlar. Você pode otimizar para isso. Ou você pode se esconder atrás das abstrações. O QBIT abre você para espiar o que está acontecendo e permite que você puxe algumas alavancas sem vender sua alma.
O QBIT é uma biblioteca, não uma estrutura. Você pode misturar e combinar o QBIT com a primavera, Guice, etc.
O QBIT agora suporta promessas REAKT Invokable para proxies de clientes locais e remotos. Isso fornece uma boa API fluente para programação assíncrona.
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();Os retornos de chamada QBIT agora também são retornos de chamada REAKT sem quebrar o contrato QBIT para retornos de chamada.
Veja as promessas invasores da REAKT para obter mais detalhes.
O QBIT é publicado no Repo Público do MAVEN.
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'Implantado em várias grandes empresas da Fortune 100. O QBIT agora funciona com Vertx (independente ou incorporado). Você também pode usar o QBIT em projetos não-QBIT, é apenas um LIB.
Apache 2
O QBIT possui serviços inproc, microsserviços e microsserviços WebSocket, além de um barramento de eventos de serviço no proco (que pode ser por módulo ou por aplicativo). Apoia trabalhadores e serviços na memória.
Antes de descrevermos mais, aqui estão dois serviços de amostra:
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}No final do dia, o QBIT é uma biblioteca simples, não uma estrutura. Seu aplicativo não é um aplicativo QBIT, mas um aplicativo Java que usa o QBIT LIB. O QBIT permite que você trabalhe com o Java UTIL simultâneo e não se esforça para escondê -lo de você. Apenas tentando tirar a picada disso.
Utilizamos técnicas no benefício e no QBIT com grande sucesso em aplicativos de alto desempenho e alto desempenho. Ajudamos os clientes a lidar com 10x a carga com 1/10 Os servidores de seus concorrentes usando técnicas no QBIT. O QBIT estamos cansando de acesso à fila e tópicos da fila.
As idéias para benefícios e qbit geralmente vêm de toda a web. Nós cometemos erros. Aponte -os. Como desenvolvedor de Boon e Qbit, somos companheiros de viagem. Se você tem uma ideia ou técnica que deseja compartilhar, ouvimos.
Uma grande inspiração para Boon/Qbit foi Vertx, Akka, GO canais, objetos ativos, encadeamento de modelos de apartamentos, ator e os papéis de simpatia mecânica.
O QBIT tem idéias semelhantes a muitas estruturas. Estamos todos lendo os mesmos papéis. O QBIT se inspirou nos documentos do LMAX Disruptor e desta postagem do blog sobre a fila de transferência de links versus disruptor. Tivemos algumas teorias sobre filas que a postagem do blog nos inspirou a experimentá -las. Algumas dessas teorias são implantadas em alguns dos maiores back -ends de middleware e cujas marcas de nome são conhecidas em todo o mundo. E assim Qbit nasceu.
O QBIT também se inspirou pelo excelente trabalho realizado por Tim Fox na Vertx. O primeiro projeto usando algo que poderia realmente ser chamado de QBIT (embora Early Qbit) estava usando o Vertx em um microsserviço da Web/Mobile para um aplicativo que poderia potencialmente ter 80 milhões de usuários. Foi essa experiência com a Vertx e o QBIT precoce que levou ao desenvolvimento e evolução do QBIT. O QBIT é construído sobre os ombros dos gigantes (Netty/Vertx).
Disruptor da primavera: Não. Spring Boot/Spring MVC: Não. Usamos as mesmas anotações, mas o QBIT é voltado para microsserviços na memória de alta velocidade. É mais como akka do que a bota da primavera. O QBIT possui um subconjunto dos recursos do Spring MVC voltado apenas para microsserviços, ou seja, WebSocket RPC, REST, JSON Marshaling, etc. Akka: Não. Bem, talvez. Akka tem conceitos semelhantes, mas eles adotam uma abordagem diferente. O QBIT está mais focado em Java e microsserviços (REST, JSON, WebSocket) do que Akka. LMAX Disruptor: Não. De fato, podemos usar o disruptor como nas filas que o QBIT usa embaixo das tampas.
(Os primeiros benchmarks foram removidos. Eles estavam aqui. Qbit ficou muito mais rápido. O benchmarking QBit é um alvo em movimento no momento. Links e relatórios serão criados.)
Exemplos de código
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)O QBIT é uma biblioteca de filas para microsserviços. É semelhante a muitos outros projetos, como Akka, Spring Reactor, etc. O QBIT é apenas uma biblioteca e não uma plataforma. O QBIT tem bibliotecas para colocar um serviço atrás de uma fila. Você pode usar filas QBIT diretamente ou criar um serviço. Os serviços QBIT podem ser expostos pelo WebSocket, HTTP, HTTP Pipeline e outros tipos de remoção. Um serviço no QBIT é uma classe Java cujos métodos são executados atrás das filas de serviço. O QBIT implementa a rosqueamento do modelo de apartamento e é semelhante ao modelo de ator ou uma melhor descrição seria objetos ativos. O QBIT não usa um disruptor (mas pode). Ele usa filas java regulares. O QBIT pode fazer o norte de 100 milhões de chamadas de pingue -pongue por segundo, o que é uma velocidade incrível (vista de 200m). O QBIT também suporta os serviços de chamada via REST e WebSocket. O QBIT é microsserviços no sentido puro da web: json, http, websocket, etc. QBIT usa micro lote para empurrar as mensagens através do tubo (fila, io, etc.) mais rapidamente para reduzir a transferência de roscas.
O QBIT é um Java Microservice Lib Supporting Rest, JSON e WebSocket. Está escrito em Java, mas poderíamos um dia escrever uma versão em ferrugem ou go ou c# (mas isso exigiria um grande dia de pagamento).
Serviço Pojo (objeto Java antigo simples) por trás de uma fila que pode receber chamadas de método por meio de chamadas ou eventos de proxy (pode ter um tópico de gerenciamento de eventos, chamadas de método e respostas ou duas para chamadas e eventos de métodos e outro para respostas, para que os manipuladores de resposta Não bloqueie o serviço. Os serviços podem usar as anotações de repouso no estilo MVC da primavera para se expor ao mundo exterior via REST e WebSocket.
ServiceBundle muitos pojos por trás da fila de uma resposta e muitos recebem filas. Pode haver um tópico para todas as respostas ou não. Eles também podem ser uma fila de recebimento.
Fila um fio gerenciando uma fila. Ele suporta lote. Possui eventos para vazio, alcançando, iniciado, inativo. Você pode ouvir esses eventos a partir de serviços que ficam atrás de uma fila. Você não precisa usar serviços. Você pode usar o direto da fila. No QBIT, você tem filas de remetentes e filas de receptores. Eles são separados para suportar microtatching.
ServiceEndPointServer ServiceBundle que é exposto à comunicação REST e WebSocket.
Eventbus EventBus é uma maneira de enviar muitas mensagens para serviços que podem ser vagamente acoplados.
ClientProxy ClientProxy é uma maneira de invocar o serviço através da interface assíncrona, o serviço pode ser inproc (mesmo processo) ou remoto no WebSocket.
O QBIT não bloqueador é um LIB não bloqueador. Você usa retornos de chamada via Java 8 Lambdas. Você também pode enviar mensagens de evento e obter respostas. As mensagens são incorporadas ao sistema para que você possa coordenar facilmente tarefas complexas. O QBIT adota uma abordagem orientada a objetos para o desenvolvimento de serviços, para que os serviços pareçam serviços Java normais que você já escreve, mas os serviços vivem atrás de uma fila/thread. Este não é um conceito novo. A Microsoft fez isso com DCOM/COM e chamou de objetos ativos. Akka faz isso com os atores e os chamou de atores digitados fortemente. Os conceitos importantes é que você obtém a velocidade das mensagens reativas e de estilo de ator, mas se desenvolve em uma abordagem natural de OOP. Qbit não é o primeiro. Qbit não é o único.
A velocidade qbit é muito rápida. É claro que há muito espaço para melhorias. Mas já o barramento de eventos de 10m+ tps inproc ping, 10m-20m+ TPS Event, 500K TPS RPC chamadas sobre o WebSocket/JSON, etc. Mais trabalho precisa ser feito para melhorar a velocidade, mas agora é rápido o suficiente onde estamos nos concentrando mais na usabilidade. O JSON Support usa o BOON por padrão, que é até 4x mais rápido que outros analisadores JSON para o restante/json, WebSocket/JSON Use Case.
Programação reativa O QBIT fornece um reator para gerenciar chamadas assíncronas. Isso permite que os retornos de chamada sejam manuseados no mesmo thread que os chamava e fornece tempo limite e manuseio de erros. Leia o tutorial do reator para criar programação de micro serviço reativo
Descoberta de serviço incorporada em suporte para descoberta de serviços. Isso inclui integração com cônsul.
StatService incorporado em suporte para estatísticas. O STATSERVICE pode ser integrado ao STATSD (Grafite, Grafana, Datadog, etc.) para publicar estatísticas passivas. Ou você pode consultar o mecanismo de estatísticas e reagir às estatísticas (contagens, horários e níveis). O STATSSERVICE é um sistema de estatísticas reativas que pode ser agrupado. O STATSERVICE é reativo, pois seus serviços podem publicá -lo e consultar e reagir com base nos resultados. Você pode implementar coisas como limitar a taxa e reagir a um aumento da taxa de algo. O sistema de descoberta de serviços integra -se ao HealthSystem e ao cônsul para enrolar cada um de seus serviços internos que compõem o Micro Service e publicam o composto disponibilizando seu micro serviço em um único endpoint HTTP ou a um interruptor morto no cônsul (TTL).
A conversa é barata. Vejamos algum código. Você pode obter uma caminhada detalhada no wiki. Já temos muita documentação.
Criaremos um serviço exposto através do REST/JSON.
Para consultar o tamanho da lista de TODO:
curl localhost:8080/services/todo-service/todo/countPara adicionar um novo item de TODO.
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todoPara obter uma lista de itens de TODO
curl http://localhost:8080/services/todo-service/todo/O exemplo TODO usará e rastreará itens de TODO.
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;O Todoservice usa anotações no estilo MVC da primavera.
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
} Você pode postar/colocar não json e capturar o corpo como uma String ou como um byte[] . Se o tipo de conteúdo estiver definido como qualquer coisa, exceto application/json e seu corpo for definido uma string ou byte []. Isso funciona automaticamente. (O tipo de conteúdo deve ser definido.)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
} Por padrão, o QBIT envia um 200 (OK) para uma chamada não-VOID (uma chamada que tem uma devolução ou um retorno de chamada). Se a operação restante não tiver retorno ou retorno de chamada, o QBIT enviará um 202 (aceito). Pode haver momentos em que você deseja enviar um 201 (criado) ou algum outro código que não seja uma exceção. Você pode fazer isso definindo code no @RequestMapping . Por padrão, o código é -1, o que significa usar o comportamento padrão (200 para sucesso, 202 para mensagem unidirecional e 500 para erros).
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks também podem ser usados para serviços internos. Muitas vezes, é o caso de você usar um retorno de chamada ou um reator QBIT para gerenciar chamadas de serviço.
Você não precisa devolver chamadas de descanso do formulário JSON. Você pode retornar qualquer binário ou qualquer texto usando HttpBinaryResponse e HttpTextResponse .
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}Por que escolhemos anotações no estilo da primavera?
Agora apenas comece.
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}É isso. Também há suporte para a geração de proxy do lado do cliente para que você possa entrar em serviços à taxa de milhões de chamadas por segundo.
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}Você sempre pode invocar os serviços QBIT por meio de um proxy do WebSocket. A vantagem de um proxy do WebSocket é que você permite executar 1M RPC+ um segundo (1 milhão de chamadas remotas a cada segundo).
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );A saída é 3.
3
O acima usa uma interface do WebSocket Proxy para chamar o serviço de assíncrono.
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}Crie o WebSocket Service Client que esteja atendendo a uma descoberta.
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" ); Atualmente, o clientBuilder carregará todos os pontos de extremidade de serviço registrados no nome do serviço e escolherá aleatoriamente um.
O serviço de serviço inclui baseado em cônsul, assistindo arquivos JSON no disco e DNS. É fácil escrever sua própria descoberta de serviço e conectá -la ao QBIT.
No futuro, podemos chamar chamadas de RoundRobin ou shard para WebSocket Service e/ou fornecer falhar automaticamente se a conexão estiver fechada. Fazemos isso para o barramento de eventos que usa a descoberta de serviços, mas ele ainda não está assado em stubs de clientes baseados em WebSocket.
O último exemplo do cliente usa o WebSocket. Você também pode usar o REST e realmente usar os parâmetros URI que configuramos. O descanso é bom, mas será mais lento que o suporte do WebSocket.
QBIT Navios com um bom cliente HTTP. Podemos usá -lo.
Você pode usá -lo para enviar chamadas assíncronas e mensagens do WebSocket com o cliente HTTP.
Aqui usaremos o cliente HTTP para invocar nosso método remoto:
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );A saída é 4.
4
Você também pode acessar o serviço a partir do CURL.
$ curl http://localhost:7000/services/adder-service/add/2/2Veja este exemplo completo aqui: qbit microsserviço Tutorial para iniciar.
QBIT URI params e WebSocket Proxy Client
O QBIT possui uma biblioteca para trabalhar e escrever microsserviços assíncronos, leve e divertido de usar.
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
Agora pare o servidor e o cliente. Muito fácil eh?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
O foco está na facilidade de uso e no uso de Java 8 Lambdas para retornos de chamada para que o código seja apertado e pequeno.
Saiba mais sobre o suporte do WebSocket no estilo MicrosService do QBIT aqui
Agora, vamos experimentar nosso cliente HTTP.
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();Você apenas passa pelo URL, pela porta e depois liga para iniciar.
Agora você pode começar a enviar solicitações HTTP.
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );Uma resposta HTTP contém apenas os resultados do servidor.
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}Existem métodos auxiliares para sincronizar o HTTP Get Chamadas.
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
O método puts é um método auxiliar que ele faz System.out.println mais ou menos a propósito.
Os cinco primeiros parâmetros são cobertos. Além das cinco, você deve usar o httpbuilder.
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );Existem chamadas assíncronas para obter também.
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[Encontre mais sobre o cliente http de microsserviço rápido e fácil de usar aqui] (https://github.com/advantageous/qbit/wiki/%5bdoc%5d-using-qbit-microservice-lib's-httpclient-get, post, -et-al, -json, -java-8-lambda).
O QBIT permite que os serviços por trás das filas também sejam executados no Proc.
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());O tutorial detalhado sobre serviços no processo está sendo escrito.
Qbit event barramento mais detalhado exemplo
O QBIT também possui um barramento de eventos de serviço. Este exemplo é um exemplo de serviço de benefícios de funcionários.
Temos dois canais.
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
Um objeto de funcionário se parece com o seguinte:
public static class Employee {
final String firstName ;
final int employeeId ;Este exemplo possui três serviços: funcionáriohiringService, benefícios Service e PayrollService.
Esses serviços são serviços inproc. O QBIT também suporta serviços remotos WebSocket, HTTP e REST, mas, por enquanto, vamos nos concentrar nos serviços inproc. Se você entender o Inproc, entenderá o controle remoto.
O funcionáriohiringService realmente dispara os eventos para outros dois serviços.
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}Observe que chamamos o SendArray para que possamos enviar o funcionário e seu salário. O ouvinte do Payroll_Adjustment_Channel terá que lidar com um funcionário e um INT que representa o salário dos novos funcionários. Você também pode usar proxies de barramento de eventos para não precisar ligar para o barramento de eventos.
O serviço de benefícios ouve para novos funcionários contratados para que possam inscrever -os no sistema de benefícios.
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}Papai precisa ser pago.
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}O funcionário é o objeto do funcionário do funcionáriohiringService.
Então você pode obter seus benefícios e pagar!
Encontre mais detalhes aqui:
Qbit event barramento mais detalhado exemplo
Você pode definir sua própria interface para o barramento de eventos e usar seus próprios barramentos de eventos com o QBIT. Cada módulo em seu serviço pode ter seu próprio barramento de eventos internos.
Para saber mais leitura: QBIT Microservice trabalhando com um barramento de eventos privados e QBIT Java Microservice Lib usando sua própria interface no barramento de eventos.
Para realmente entender o QBIT, é preciso entender os conceitos de um retorno de chamada.
Um retorno de chamada é uma maneira de obter uma resposta assíncrona no QBIT.
Você chama um método de serviço e ele liga de volta.
Os proxies do cliente podem ter retornos de chamada:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}Os retornos de chamada são Java 8 consumidores com algum manuseio extra de erros opcionais.
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}Os serviços que podem bloquear devem usar retornos de chamada. Assim, se o LoadUser bloqueado no exemplo a seguir, ele realmente deve usar um retorno de chamada em vez de retornar um valor.
classe pública Recomendationservice {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
} Vamos fingir que loadUser deve procurar em um cache local e, se o usuário não for encontrado, procure em um cache fora do chicote e, se não for, ele deve pedir o usuário do Serviço de Usuários que deve verificar seus caches e talvez o fallback para carregar o Dados do usuário de um banco de dados ou de outros serviços. Em outras palavras, loadUser pode potencialmente bloquear o IO.
Nosso cliente não bloqueia, mas nosso serviço o faz. Voltando ao nosso RecommendationService . Se obtivermos muitos acertos de cache para cargas de usuário, talvez o bloco não demore muito, mas estará lá e toda vez que precisamos culpar em um usuário, todo o sistema está enlouquecido. O que queremos poder fazer é que, se não conseguirmos lidar com a solicitação de recomendação, avançamos e fazemos uma chamada assíncrona para o UserDataService . Quando esse retorno de chamada assíncrona volta, lidamos com essa solicitação. Nesse meio tempo, lidamos com a recomendação das solicitações o mais rápido possível. Nós nunca bloqueamos.
Então, vamos revisitar o serviço. A primeira coisa que vamos fazer é fazer o método de serviço receber um retorno de chamada. Antes de fazer isso, vamos definir algumas regras.
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {Agora estamos recebendo um retorno de chamada e podemos decidir quando queremos lidar com essa solicitação de geração de recomendação. Podemos fazê-lo imediatamente se os dados do usuário de que precisamos estiver na memória ou podemos atrasá-los.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
} Observe que, se o usuário for encontrado no cache, executamos nossas regras de recomendação na memória e chamamos o retorno de chamada imediatamente recommendationsCallback.accept(runRulesEngineAgainstUser(user)) .
A parte interessante é o que fazemos se não tiver o usuário carregado.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
... Aqui usamos um retorno de chamada para carregar o usuário e, quando o usuário é carregado, chamamos handleLoadFromUserDataService o que adiciona algum gerenciamento sobre o manuseio do retorno de chamada para que ainda possamos lidar com essa chamada, mas não agora.
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...O uso de lambdas como esse torna o código mais legível e conciso, mas lembre -se de não nidar profundamente as expressões lambda ou você criará um pesadelo de manutenção de código. Use -os criteriosamente.
O que queremos é lidar com a solicitação de recomendações depois que o sistema de serviço do usuário carrega o usuário em sua loja.
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
} A parte importante é que toda vez que recebemos uma chamada de retorno do UserDataService , realizamos nossas regras de recomendação intensiva da CPU e o retorno de chamada nosso chamador. Bem, não exatamente, o que fazemos é envolver um executável na fila de retornos de chamada e, mais tarde, iremos iterar por isso, mas quando?
O RecommendationService pode ser notificado quando sua fila estiver vazia, inicia um novo lote e quando atingir um limite de lote. Todos esses são bons tempos para lidar com retornos de chamada do UserDataService .
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}É importante lembrar ao lidar com retornos de chamada de outro microsserviço que você deseja lidar com retornos de chamada do outro serviço antes de lidar com mais solicitações de incorporação de seus clientes. Essencialmente, você tem clientes que estavam esperando (assíncrona esperando, mas ainda assim), e esses clientes podem representar uma conexão TCP/IP aberta como uma chamada HTTP, por isso é melhor fechá -los antes de lidar com mais solicitações e, como dissemos, eles já estavam esperando Com uma conexão aberta para os usuários carregarem o serviço de usuário.
Para saber mais sobre os retornos de chamada, Plesae leu [QBIT Java Microservice Lib Fundamentals Rettemorats] ([Corte AGRUPO] QBIT Microservice Lib trabalhando com retornos de chamada).
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...Você pode compor trabalhadores fragmentados (para serviços intensivos na memória, seguros de threads, CPU) ou trabalhadores para IO ou conversando com serviços estrangeiros ou ônibus estrangeiros.
Aqui está um exemplo que usa um pool de trabalhadores com três trabalhadores de serviço:
Digamos que você tenha um serviço que faça algo:
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}Agora, isso faz algum tipo de IO e você quer ter um banco destes em execução não apenas um para que você possa fazer IO em paralelo. Após alguns testes de desempenho, você descobriu que três é o número mágico.
Você deseja usar sua API para acessar este serviço:
public interface MultiWorkerClient {
void doSomeWork (...);
}Agora vamos criar um banco e usá -lo.
Primeiro, crie os serviços QBIT que adicionem o thread/fila/microbatch.
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();Agora adicione -os a um objeto de trabalhadores de serviço.
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workersVocê pode adicionar serviços, poços e consumidores de métodos, despachantes de métodos a um pacote de serviços. O pacote de serviço é um ponto de integração no QBIT.
Vamos adicionar nossos novos trabalhadores de serviço. ServiceWorkers é um servicemethoddispatcher.
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Provavelmente vamos adicionar um método auxiliar ao pacote de serviços para que a maior parte disso possa acontecer em uma única chamada.
Agora você pode começar a usar seus trabalhadores.
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );Agora você pode usar a primavera ou o Guice para configurar os construtores e o pacote de serviços. Mas você pode fazer isso como o acima, o que é bom para testar e entender os internos do QBIT.
O QBIT também suporta o conceito de serviços sharded, que é bom para recursos de fragmentação como a CPU (execute um mecanismo de regras em cada núcleo da CPU para um mecanismo de recomendação de usuário).
O QBIT não sabe como destruir seus serviços, você precisa dar uma dica. Você faz isso através de uma regra de Shard.
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}Trabalhamos em um aplicativo em que o primeiro argumento dos serviços era o nome de usuário e, em seguida, usamos isso para encharcar chamadas para um mecanismo intensivo de regras na memória da CPU. Esta técnica funciona. :)
A classe ServiceWorkers tem um método para criar um pool de trabalhadores fragmentados.
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}Para usar você, basta passar por uma chave de fragmento quando você cria os funcionários do serviço.
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});Em seguida, adicione seus serviços à composição dos serviços de serviço.
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}Em seguida, adicione -o ao pacote de serviço como antes.
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();Em seguida, basta usá -lo:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}O shardonbeanpath permite que você crie uma chamada complexa de navegação de caminho do feijão e use sua propriedade para encharcar.
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );Leia mais sobre os trabalhadores de sharding e serviço de serviço aqui
Você pode encontrar muito mais no wiki. Siga também os compromissos. Estamos ocupados castores. QBIT O MICROSERVICE LIB PARA JAVA - JSON, REST, WEBSOCKET.