Qbit Java Micorservices libチュートリアル| Qbit Webサイト| Qbitはreakt |を使用しますQbitはvert.xで動作しますReakt Vertx
Java Microservice lib。 QBITは、JSON、HTTP、WebSocket、およびRESTのマイクロサービスを構築するためのリアクティブプログラミングLIBです。 QBITは、リアクティブなプログラミングを使用して、弾力性のある休息とWebSocketsベースのクラウドに優しいWebサービスを構築します。 SOAはモバイルとクラウドのために進化しました。 ServicedIscovery、Health、Reactive Statservice、イベント、マイクロサービス用のJava慣用反応プログラミング。
質問がありますか?ここで質問してください:Qbit Googleグループ。
すべてがキューです。あなたには選択肢があります。あなたはそれを受け入れて制御することができます。最適化できます。または、抽象化の後ろに隠れることができます。 Qbitは、何が起こっているのかを覗くようになり、魂を売らずにレバーを引っ張ることができます。
Qbitはフレームワークではなくライブラリです。 QBITとSpring、Guiceなどを混ぜて一致させることができます。
QBITは、ローカルおよびリモートクライアントのプロキシに対するReakt Invokable Promisesをサポートするようになりました。これにより、Asyncプログラミング用の素敵なFluent APIが提供されます。
employeeService . lookupEmployee ( "123" )
. then (( employee )-> {...}). catchError (...). invoke ();QBITコールバックは、コールバックのQBIT契約を破ることなく、リークコールバックにもなりました。
詳細については、Reakt Invokable Promisesを参照してください。
QbitはMaven Public Repoに公開されています。
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-admin</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency >
< dependency >
< groupId >io.advantageous.qbit</ groupId >
< artifactId >qbit-vertx</ artifactId >
< version >1.10.0.RELEASE</ version >
</ dependency > compile 'io.advantageous.qbit:qbit-admin:1.10.0.RELEASE'
compile 'io.advantageous.qbit:qbit-vertx:1.10.0.RELEASE'いくつかの大手100社に展開されました。 QbitはVertx(スタンドアロンまたは埋め込み)で動作します。 qbitプロジェクトでqbitを使用することもできます。これは単なるlibです。
Apache 2
QBITには、InProcサービス、RESTマイクロサービス、WebSocket Microservices、およびInProcサービスイベントバス(モジュールごとまたはアプリごとに可能)があります。労働者とインメモリサービスをサポートします。
もっと説明する前に、2つのサンプルサービスを次に示します。
@ RequestMapping ( "/todo-service" )
public class TodoService {
@ RequestMapping ( "/todo/count" )
public int size () {...
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {... @ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {...
}一日の終わりには、Qbitはフレームワークではなく単純なライブラリです。アプリはQBITアプリではなく、QBIT LIBを使用するJavaアプリです。 QBITを使用すると、Java Util Concurrentを使用することができ、それを隠そうと努力しません。それから刺し傷を取り除こうとしています。
BoonとQBITのテクニックを使用して、ハイエンド、高性能、高スケーラブルなアプリで大成功を収めました。クライアントは、QBITのテクニックを使用して、競合他社のサーバー1/10で10倍の負荷を処理するのを支援しました。 Qbitは、キューのアクセスとスレッドのチューニングにうんざりしていることです。
BoonとQbitのアイデアは、多くの場合、Web全体から生まれます。私たちは間違いを犯します。それらを指摘します。 BoonとQbitの開発者として、私たちは仲間の旅行者です。共有したいアイデアやテクニックがある場合は、聞きます。
BOON/QBITの大きなインスピレーションは、Vertx、Akka、Go Channels、Active Objects、Apartment Model Sthreading、Actor、およびMechanical Simmal Papersでした。
Qbitには、多くのフレームワークに似たアイデアがあります。私たちは皆、同じ論文を読んでいます。 Qbitは、LMAX Disruptor Papersと、リンク転送キューとDisruptorに関するこのブログ投稿からインスピレーションを得ました。私たちは、ブログ投稿が私たちにそれらを試してみるように促したキューについていくつかの理論を持っていました。これらの理論のいくつかは、最大のミドルウェアバックエンドのいくつかに展開されており、その名前ブランドは世界中で知られています。したがって、Qbitが生まれました。
Qbitはまた、VertxでTim Foxが行った素晴らしい仕事に多くのインスピレーションを与えました。実際にQBIT(初期QBITではあるが)と呼ばれるものを使用する最初のプロジェクトは、8,000万人のユーザーがいる可能性のあるアプリにWeb/モバイルマイクロサービスでVERTXを使用していました。 QBITの開発と進化につながったのは、VERTXと初期のQBITでのこの経験でした。 Qbitは、巨人の肩(Netty/Vertx)の上に構築されています。
Spring Disruptor:いいえ。SpringDisruptor用のプラグインを作成するためにQbitを使用できますが、QbitはSpring Disruptorと競合していません。 Spring Boot/Spring MVC:いいえ。同じ注釈を使用していますが、QBITは高速インメモリマイクロサービス用に調整されています。 Spring BootよりもAkkaのようなものです。 QBITには、マイクロサービス、つまりWebSocket RPC、REST、JSON MARSHALINGなどにのみ調整されたスプリングMVCの機能のサブセットがあります。Akka:いいえ。 Akkaには同様の概念がありますが、別のアプローチをとっています。 Qbitは、AkkaよりもJava、およびMicroservices(REST、JSON、WebSocket)により焦点を合わせています。 LMAX DISRAMSOR:いいえ。実際、QBITがカバーの下で使用するキューのように、Disruptorを使用できます。
(初期のベンチマークは削除されました。彼らはここにいました。Qbitはより速くなりました。ベンチマークQBITは現時点では移動ターゲットです。リンクとレポートが作成されます。)
コードの例
====
BasicQueue < Integer > queue = BasicQueue . create ( Integer . class , 1000 );
//Sending threads
SendQueue < Integer > sendQueue = queue . sendQueue ();
for ( int index = 0 ; index < amount ; index ++) {
sendQueue . send ( index );
}
sendQueue . flushSends ();
...
sendQueue . sendAndFlush ( code );
//other methods for sendQueue, writeBatch, writeMany
//Receiving Threads
ReceiveQueue < Integer > receiveQueue = queue . receiveQueue ();
Integer item = receiveQueue . take ();
//other methods poll(), pollWait(), readBatch(), readBatch(count)Qbitは、マイクロサービス用のキューイングライブラリです。 Akka、Spring Reactorなどの他の多くのプロジェクトに似ています。Qbitは、プラットフォームではなくライブラリです。 Qbitには、キューの後ろにサービスを設置するライブラリがあります。 Qbitキューを直接使用するか、サービスを作成できます。 QBITサービスは、WebSocket、HTTP、HTTPパイプライン、およびその他の種類のリモートによって公開できます。 QBITのサービスは、サービスキューの背後にメソッドが実行されるJavaクラスです。 QBITはアパートモデルのスレッドを実装し、アクターモデルに似ているか、より良い説明はアクティブなオブジェクトです。 QBITは破壊者を使用しません(ただし、可能性があります)。通常のJavaキューを使用します。 QBITは、1秒あたり1億個のPing Pongコールの北を実行できます。これは驚くべき速度です(200mと見なされます)。 QBITは、RESTおよびWebSocketを介した通話サービスもサポートしています。 QBITは、JSON、HTTP、WebSocketなどの純粋なWebセンスのマイクロサービスです。QBITはマイクロバッチを使用して、パイプ(キュー、IOなど)を介してメッセージをプッシュしてスレッドハンドオフを減らします。
Qbitは、REST、JSON、WebSocketをサポートするJava MicroService Libです。 Javaで書かれていますが、いつかRustまたはGoまたはC#でバージョンを書くことができます(ただし、それには大きな給料日が必要です)。
プロキシコールまたはイベントを介してメソッドコールを受信できるキューの背後にあるサービスPOJO(プレーンオールドジャワオブジェクト)(1つのスレッド管理イベント、メソッド呼び出し、およびメソッドコールとイベントの場合、1つは1つ、もう1つは応答ハンドラーを使用することができます。応答がブロックされない限り、サービスをブロックしないでください)。サービスは、Spring MVCスタイルのRESTアノテーションを使用して、RESTとWebSocketを介して外の世界にさらされます。
Service -Bundle 1つの応答キューの背後に多くのポジョーがあり、多くはキューを受け取ります。すべての応答のための1つのスレッドがあるかどうか。また、キューを受信することもできます。
キューキューを管理するスレッド。バッチをサポートします。空、ReachLimit、StartingBatch、Idleのイベントがあります。キューの後ろにあるサービスからこれらのイベントを聴くことができます。サービスを使用する必要はありません。 Queueのダイレクトを使用できます。 QBITでは、送信者キューとレシーバーキューがあります。それらはマイクロバッチングをサポートするために分離されています。
ServiceEndPointServer ServiceBundleは、RESTおよびWebSocking通信にさらされています。
EventBus EventBusは、大まかに結合される可能性のあるサービスに多くのメッセージを送信する方法です。
ClientProxy ClientProxyは、Asyncインターフェイスを介してサービスを呼び出す方法です。サービスは、WebSocketで移動するか、リモートすることができます。
非ブロッキングQBITは、非ブロッキングLIBです。 Java 8 Lambdasを介してコールバックを使用します。イベントメッセージを送信して返信することもできます。メッセージングはシステムに組み込まれているため、複雑なタスクを簡単に調整できます。 QBITは、サービス開発に対するオブジェクト指向のアプローチを取っているため、サービスは既に書いている通常のJavaサービスのように見えますが、サービスはキュー/スレッドの後ろに住んでいます。これは新しい概念ではありません。 MicrosoftはDCOM/COMでこれを行い、Active Objectsと呼びました。 Akkaは俳優と一緒にそれを行い、彼らを強くタイプした俳優と呼びました。重要な概念は、リアクティブで俳優スタイルのメッセージングの速度を取得しますが、自然なOOPアプローチで開発することです。 Qbitは最初ではありません。 Qbitは唯一ではありません。
Speed QBitは非常に高速です。もちろん、改善の余地がたくさんあります。しかし、すでに200m+ TPSインプロックピンポン、10m-20m+ TPSイベントバス、500K TPS RPCコールをWebSocket/JSONなどに通しています。 JSONサポートは、デフォルトでBOONを使用します。これは、REST/JSON、WebSocket/JSONユースケースの他のJSONパーサーよりも最大4倍高速です。
リアクティブプログラミングQBITは、非同期呼び出しを管理するための原子炉を提供します。これにより、コールバックを呼び出した同じスレッドで処理することができ、タイムアウトとエラー処理を提供します。リアクティブマイクロサービスプログラミングを作成するための原子炉チュートリアルをお読みください
サービスの発見をサポートして構築されたサービスディスカバリー。これには、Consulとの統合が含まれます。
統計を構築して統計をサポートします。 STATSERVICEをSTATSD (Graphite、Grafana、datadogなど)と統合して、パッシブ統計を公開できます。または、統計エンジンを照会し、統計(カウント、タイミング、レベル)に反応することができます。 StatsServiceは、クラスター化できるリアクティブな統計システムです。 StatServiceは、サービスが発行し、それを照会し、結果に基づいて反応できるという点で反応します。レートの制限などを実装し、何かの増加率に対応できます。 ServicedIscoveryシステムは、HealthSystemおよびConsulと統合して、マイクロサービスを構成する各内部サービスをロールアップし、マイクロサービスの複合を単一のHTTPエンドポイントまたはConsulのDead Mans Switch(TTL)に公開します。
話は安いです。いくつかのコードを見てみましょう。 Wikiで詳細な散歩をすることができます。すでに多くのドキュメントがあります。
REST/JSONを介して公開されるサービスを作成します。
TODOリストのサイズを照会するには:
curl localhost:8080/services/todo-service/todo/count新しいTODOアイテムを追加します。
curl -X POST -H " Content-Type: application/json " -d
' {"name":"xyz","description":"xyz"} '
http://localhost:8080/services/todo-service/todoTODOアイテムのリストを取得するには
curl http://localhost:8080/services/todo-service/todo/TODOの例は、TODOアイテムを使用および追跡します。
package io . advantageous . qbit . examples ;
import java . util . Date ;
public class TodoItem {
private final String description ;
private final String name ;
private final Date due ;Todoserviceは、Spring MVCスタイルの注釈を使用しています。
@ RequestMapping ( "/todo-service" )
public class TodoService {
private List < TodoItem > todoItemList = new ArrayList <>();
@ RequestMapping ( "/todo/count" )
public int size () {
return todoItemList . size ();
}
@ RequestMapping ( "/todo/" )
public List < TodoItem > list () {
return todoItemList ;
}
@ RequestMapping ( value = "/todo" , method = RequestMethod . POST )
public void add ( TodoItem item ) {
todoItemList . add ( item );
}
}非jsonを投稿/配置することができ、体をStringまたはbyte[] 。コンテンツタイプがapplication/json以外のものに設定され、体が文字列またはバイト[]が定義されている場合。これは自動的に機能します。 (コンテンツタイプを設定する必要があります。)
@ RequestMapping ( value = "/body/bytes" , method = RequestMethod . POST )
public boolean bodyPostBytes ( byte [] body ) {
String string = new String ( body , StandardCharsets . UTF_8 );
return string . equals ( "foo" );
}
@ RequestMapping ( value = "/body/string" , method = RequestMethod . POST )
public boolean bodyPostString ( String body ) {
return body . equals ( "foo" );
}デフォルトでは、QBITはボイド以外のコールのために200 (OK)を送信します(返品またはコールバックがあるコール)。 REST操作に戻りまたはコールバックがない場合、QBITは202 (受け入れられた)を送信します。例外ではない201(作成)または他のコードを送信する場合があります。 @RequestMappingにcode設定することで、それを行うことができます。デフォルトでは、コードは-1です。つまり、デフォルトの動作を使用します(成功するには200、一方向メッセージで202、エラーには500)。
@ RequestMapping ( value = "/helloj7" , code = 221 )
public void helloJSend7 ( Callback < JSendResponse < List < String >>> callback ) {
callback . returnThis ( JSendResponseBuilder . jSendResponseBuilder ( Lists . list (
"hello " + System . currentTimeMillis ())). build ());
} Callbacks内部サービスにも使用できます。多くの場合、CallBackBuilderまたはQBITリアクターを使用してサービスコールを管理する場合があります。
JSONフォームレストコールを返す必要はありません。 HttpBinaryResponseおよびHttpTextResponse使用して、任意のバイナリまたは任意のテキストを返すことができます。
@ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpTextResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildTextResponse ());
} @ RequestMapping ( method = RequestMethod . GET )
public void ping2 ( Callback < HttpBinaryResponse > callback ) {
callback . resolve ( HttpResponseBuilder . httpResponseBuilder ()
. setBody ( "hello mom" ). setContentType ( "mom" )
. setCode ( 777 )
. buildBinaryResponse ());
}なぜ春のスタイルの注釈を選んだのですか?
今すぐ開始してください。
public static void main ( String ... args ) {
ServiceEndpointServer server = new EndpointServerBuilder (). build ();
server . initServices ( new TodoService ());
server . start ();
}それだけです。また、クライアント側のプロキシ生成を備えたBox Websocketサポートもありますので、毎秒数百万の通話率でサービスに電話をかけることができます。
@ RequestMapping ( "/adder-service" )
public class AdderService {
@ RequestMapping ( "/add/{0}/{1}" )
public int add ( @ PathVariable int a , @ PathVariable int b ) {
return a + b ;
}
}WebSocketプロキシを介していつでもQBITサービスを呼び出すことができます。 Websocketプロキシの利点は、1秒間に1M RPC+を実行できることです(毎秒100万個のリモートコール)。
/* Start QBit client for WebSocket calls. */
final Client client = clientBuilder ()
. setPort ( 7000 ). setRequestBatchSize ( 1 ). build ();
/* Create a proxy to the service. */
final AdderServiceClientInterface adderService =
client . createProxy ( AdderServiceClientInterface . class ,
"adder-service" );
client . start ();
/* Call the service */
adderService . add ( System . out :: println , 1 , 2 );出力は3です。
3
上記では、WebSocketプロキシインターフェイスを使用して、サービスアナイクを呼び出します。
interface AdderServiceClientInterface {
void add ( Callback < Integer > callback , int a , int b );
}ServicedIscoveryのWebSocket Serviceクライアントを作成します。
final Client client = clientBuilder . setServiceDiscovery ( serviceDiscovery , "echo" )
. setUri ( "/echo" ). setProtocolBatchSize ( 20 ). build (). startClient ();
final EchoAsync echoClient = client . createProxy ( EchoAsync . class , "echo" );現在、 clientBuilderサービス名の下に登録されているすべてのサービスエンドポイントをロードし、ランダムに選択します。
ServicedIscoveryには、DISK上のJSONファイルを監視し、DNSを監視しているConsulベースが含まれます。独自のサービスの発見も簡単に書いて、それをQBITに接続するのは簡単です。
将来的には、接続が閉じられている場合、WebSocketサービスの呼び出しまたはSHARD CALLOをWebSocket Serviceまたは/またはAutoの失敗を提供することができます。これは、サービスディスカバリーを使用するイベントバスでこれを行いますが、Websocketベースのクライアントスタブにまだ焼かれていません。
最後のクライアントの例では、WebSocketを使用します。また、RESTを使用するだけで、実際にセットアップするURI Paramsを使用できます。休憩はいいですが、Websocketのサポートよりも遅くなるでしょう。
Qbitは、素敵な小さなHTTPクライアントを備えた出荷です。使用できます。
これを使用して、HTTPクライアントとともにAsync CallsとWebSocketメッセージを送信できます。
ここでは、HTTPクライアントを使用して、リモートメソッドを呼び出します。
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" )
. setPort ( 7000 ). build ();
httpClient . start ();
String results = httpClient
. get ( "/services/adder-service/add/2/2" ). body ();
System . out . println ( results );出力は4です。
4
Curlからサービスにアクセスすることもできます。
$ curl http://localhost:7000/services/adder-service/add/2/2この完全な例はこちらをご覧ください:QBIT MicroService Getting Wart Tutorial。
QBIT URI ParamsおよびWebSocket Proxyクライアント
Qbitには、軽量で楽しいAsync Microservicesと協力して執筆するためのライブラリがあります。
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build (); /* Setup WebSocket Server support. */
httpServer . setWebSocketOnOpenConsumer ( webSocket -> {
webSocket . setTextMessageConsumer ( message -> {
webSocket . sendText ( "ECHO " + message );
});
}); /* Start the server. */
httpServer . start (); /** CLIENT. */
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start (); /* Setup the client websocket. */
WebSocket webSocket = httpClient
. createWebSocket ( "/websocket/rocket" );
/* Setup the text consumer. */
webSocket . setTextMessageConsumer ( message -> {
System . out . println ( message );
});
webSocket . openAndWait ();
/* Send some messages. */
webSocket . sendText ( "Hi mom" );
webSocket . sendText ( "Hello World!" );
ECHO Hi mom
ECHO Hello World!
サーバーとクライアントを停止します。かなり簡単ですか?
/* Create an HTTP server. */
HttpServer httpServer = httpServerBuilder ()
. setPort ( 8080 ). build ();
/* Setting up a request Consumer with Java 8 Lambda expression. */
httpServer . setHttpRequestConsumer ( httpRequest -> {
Map < String , Object > results = new HashMap <>();
results . put ( "method" , httpRequest . getMethod ());
results . put ( "uri" , httpRequest . getUri ());
results . put ( "body" , httpRequest . getBodyAsString ());
results . put ( "headers" , httpRequest . getHeaders ());
results . put ( "params" , httpRequest . getParams ());
httpRequest . getReceiver ()
. response ( 200 , "application/json" , Boon . toJson ( results ));
});
/* Start the server. */
httpServer . start ();
焦点は、使いやすさと、コールバックにJava 8 Lambdasを使用して、コードがタイトで小さくなることにあります。
QbitのマイクロサービススタイルのWebSocketSupportの詳細については、こちら
それでは、HTTPクライアントを試してみましょう。
/* Setup an httpClient. */
HttpClient httpClient = httpClientBuilder ()
. setHost ( "localhost" ). setPort ( 8080 ). build ();
httpClient . start ();URL、ポートを渡してからスタートを呼び出します。
これで、HTTPリクエストの送信を開始できます。
/* Send no param get. */
HttpResponse httpResponse = httpClient . get ( "/hello/mom" );
puts ( httpResponse );HTTP応答には、サーバーの結果が含まれています。
public interface HttpResponse {
MultiMap < String , String > headers ();
int code ();
String contentType ();
String body ();
}同期するためのヘルパーメソッドがありますhttp get呼び出し。
/* Send one param get. */
httpResponse = httpClient . getWith1Param ( "/hello/singleParam" ,
"hi" , "mom" );
puts ( "single param" , httpResponse );
/* Send two param get. */
httpResponse = httpClient . getWith2Params ( "/hello/twoParams" ,
"hi" , "mom" , "hello" , "dad" );
puts ( "two params" , httpResponse );
...
/* Send five param get. */
httpResponse = httpClient . getWith5Params ( "/hello/5params" ,
"hi" , "mom" ,
"hello" , "dad" ,
"greetings" , "kids" ,
"yo" , "pets" ,
"hola" , "neighbors" );
puts ( "5 params" , httpResponse );
PUTSメソッドは、system.out.printlnを行うヘルパーメソッドです。ちなみに多かれ少なかれ。
最初の5つのパラマはカバーされています。 5を超えて、httpbuilderを使用する必要があります。
/* Send six params with get. */
final HttpRequest httpRequest = httpRequestBuilder ()
. addParam ( "hi" , "mom" )
. addParam ( "hello" , "dad" )
. addParam ( "greetings" , "kids" )
. addParam ( "yo" , "pets" )
. addParam ( "hola" , "pets" )
. addParam ( "salutations" , "all" ). build ();
httpResponse = httpClient . sendRequestAndWait ( httpRequest );
puts ( "6 params" , httpResponse );Async ComplesもGetを求めています。
/* Using Async support with lambda. */
httpClient . getAsync ( "/hi/async" , ( code , contentType , body ) -> {
puts ( "Async text with lambda" , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith1Param ( "/hi/async" , "hi" , "mom" , ( code , contentType , body ) -> {
puts ( "Async text with lambda 1 param n " , body );
});
Sys . sleep ( 100 );
/* Using Async support with lambda. */
httpClient . getAsyncWith2Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 2 params n " , body );
});
Sys . sleep ( 100 );
...
/* Using Async support with lambda. */
httpClient . getAsyncWith5Params ( "/hi/async" ,
"p1" , "v1" ,
"p2" , "v2" ,
"p3" , "v3" ,
"p4" , "v4" ,
"p5" , "v5" ,
( code , contentType , body ) -> {
puts ( "Async text with lambda 5 params n " , body );
});
Sys . sleep ( 100 );[使いやすい、高速マイクロサービスHTTPクライアントの詳細をご覧ください](https://github.com/advastuous/qbit/wiki/%5bdoc%5d-using-qbit-microservice-lib's-httpclient-get、post、 -et-al、-json、-java-8-lambda)。
QBITは、キューの背後にあるサービスも同様に実行することができます。
/* POJO service. */
final TodoManager todoManagerImpl = new TodoManager ();
/*
Create the service which manages async calls to todoManagerImpl.
*/
final Service service = serviceBuilder ()
. setServiceObject ( todoManagerImpl )
. build (). startServiceQueue ();
/* Create Asynchronous proxy over Synchronous service. */
final TodoManagerClientInterface todoManager =
service . createProxy ( TodoManagerClientInterface . class );
service . startCallBackHandler ();
System . out . println ( "This is an async call" );
/* Asynchronous method call. */
todoManager . add ( new Todo ( "Call Mom" , "Give Mom a call" ));
AtomicInteger countTracker = new AtomicInteger ();
//Hold count from async call to service... for testing and showing it is an async callback
System . out . println ( "This is an async call to count" );
todoManager . count ( count -> {
System . out . println ( "This lambda expression is the callback " + count );
countTracker . set ( count );
});
todoManager . clientProxyFlush (); //Flush all methods. It batches calls.
Sys . sleep ( 100 );
System . out . printf ( "This is the count back from the server %d n " , countTracker . get ());InProcサービスに関する詳細なチュートリアルが書かれています。
Qビットイベントバスの詳細な例
Qbitにはサービスイベントバスもあります。この例は、従業員の福利厚生サービスの例です。
2つのチャネルがあります。
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
従業員のオブジェクトは次のようになります:
public static class Employee {
final String firstName ;
final int employeeId ;この例には、Employee Hiringservice、BenefityService、および給与計算サービスの3つのサービスがあります。
これらのサービスは情報サービスです。 QBITは、WebSocket、HTTP、RESTリモートサービスもサポートしていますが、今のところ、InProcサービスに焦点を当てましょう。 InProcを理解すれば、リモートを理解できます。
従業員のHiringserviceは、実際に他の2つのサービスにイベントを解雇します。
public class EmployeeHiringService {
public void hireEmployee ( final Employee employee ) {
int salary = 100 ;
System . out . printf ( "Hired employee %s n " , employee );
//Does stuff to hire employee
//Sends events
final EventManager eventManager =
serviceContext (). eventManager ();
eventManager . send ( NEW_HIRE_CHANNEL , employee );
eventManager . sendArray ( PAYROLL_ADJUSTMENT_CHANNEL ,
employee , salary );
}
}従業員とその給与を送ることができるように、SendArrayに電話してください。 Payroll_adjustment_channelのリスナーは、新しい従業員の給与を表す従業員とINTの両方を処理する必要があります。イベントバスのプロキシを使用することもできます。そうすることで、イベントバスに電話をかける必要はありません。
福利厚生は、新しい従業員が雇用されているために耳を傾け、福利厚生システムに登録できるようにします。
public static class BenefitsService {
@ OnEvent ( NEW_HIRE_CHANNEL )
public void enroll ( final Employee employee ) {
System . out . printf ( "Employee enrolled into benefits system employee %s %d n " ,
employee . getFirstName (), employee . getEmployeeId ());
}パパは報酬を得る必要があります。
public static class PayrollService {
@ OnEvent ( PAYROLL_ADJUSTMENT_CHANNEL )
public void addEmployeeToPayroll ( final Employee employee , int salary ) {
System . out . printf ( "Employee added to payroll %s %d %d n " ,
employee . getFirstName (), employee . getEmployeeId (), salary );
}
}従業員は、従業員Hiringserviceの従業員オブジェクトです。
だからあなたはあなたの利益を得ることができ、支払うことができます!
詳細をご覧ください:
Qビットイベントバスの詳細な例
イベントバスに独自のインターフェイスを定義でき、QBITで独自のイベントバスを使用できます。サービス内の各モジュールは、独自の内部イベントバスを使用できます。
詳細については、QBIT MicroserviceプライベートイベントバスとQBIT Java Microservice libを使用して、独自のインターフェイスを使用してイベントバスを使用してください。
QBITを本当に把握するには、コールバックの概念を把握する必要があります。
コールバックは、QBITで非同期応答を取得する方法です。
あなたはサービス方法を呼び出し、それはあなたに電話をかけます。
クライアントプロキシはコールバックを持つことができます:
public interface RecommendationServiceClient {
void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName );
}コールバックはJava 8消費者であり、オプションの追加エラー処理があります。
public interface Callback < T > extends java . util . function . Consumer < T > {
default void onError ( java . lang . Throwable error ) { /* compiled code */ }
}ブロックできるサービスは、コールバックを使用する必要があります。したがって、Loaduserが次の例でブロックされている場合、値を返すのではなく、実際にコールバックを使用する必要があります。
パブリッククラスの推奨サービスサービス{
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
public List < Recommendation > recommend ( final String userName ) {
User user = users . get ( userName );
if ( user == null ) {
user = loadUser ( userName );
}
return runRulesEngineAgainstUser ( user );
} loadUserローカルキャッシュを見る必要があるふりをしましょう。ユーザーが見つからない場合は、オフヒープキャッシュを見てください。見つけられない場合は、ユーザーにキャッシュをチェックし、おそらくフォールバックをロードする必要があります。データベースまたは他のサービスからのユーザーデータ。言い換えれば、 loadUser IOでブロックする可能性があります。
クライアントはブロックしませんが、サービスはブロックします。 RecommendationServiceサービスに戻ります。ユーザーロードのキャッシュヒットをたくさん取得した場合、おそらくブロックはそれほど長くはありませんが、それはそこにあり、ユーザーに障害が必要なたびに、システム全体がゴムになります。私たちができることを望んでいるのは、推奨要求を処理できない場合、先に進み、 UserDataServiceに非同期の呼び出しを行うことです。そのASYNCコールバックが戻ってきたら、そのリクエストを処理します。それまでの間、できるだけ早く推奨リストリクエストを処理します。私たちは決してブロックしません。
それでは、サービスを再訪しましょう。最初にやろうとしていることは、サービスメソッドにコールバックを取得させることです。それを行う前に、いくつかのルールを設定しましょう。
public class RecommendationService {
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {今、私たちはコールバックを取得しており、この推奨生成リクエストを処理することをいつ決定することができます。必要なユーザーデータがインメモリである場合、または遅延できる場合は、すぐに実行できます。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in user cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
...
} else {
/* Call the callback now because we can handle the callback now. */
recommendationsCallback . accept ( runRulesEngineAgainstUser ( user ));
}
}ユーザーがキャッシュで見つかった場合、推奨ルールをメモリ内で実行し、 recommendationsCallback.accept(runRulesEngineAgainstUser(user))をすぐに呼び出します。
興味深い部分は、ユーザーがロードされていない場合、私たちが何をするかです。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using Callback. */
userDataService . loadUser ( new Callback < User >() {
@ Override
public void accept ( final User loadedUser ) {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}
}, userName );
}
...ここでは、ユーザーをロードするためにコールバックを使用し、ユーザーがロードされたら、 handleLoadFromUserDataServiceを呼び出します。
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
/** Look for user in users cache. */
User user = users . get ( userName );
/** If the user not found, load the user from the user service. */
if ( user == null ) {
/* Load user using lambda expression. */
userDataService . loadUser (
loadedUser -> {
handleLoadFromUserDataService ( loadedUser ,
recommendationsCallback );
}, userName );
}
...このようなLambdasを使用すると、コードがより読みやすく簡潔になりますが、Lambdaの表現を深く巣にしないでください。そうしないと、コードメンテナンスの悪夢が作成されます。慎重に使用してください。
ユーザーサービスシステムがユーザーをストアからロードした後、推奨事項のリクエストを処理することです。
public class RecommendationService {
private final SimpleLRUCache < String , User > users =
new SimpleLRUCache <>( 10_000 );
private UserDataServiceClient userDataService ;
private BlockingQueue < Runnable > callbacks =
new ArrayBlockingQueue < Runnable >( 10_000 );
...
public void recommend ( final Callback < List < Recommendation >> recommendationsCallback ,
final String userName ) {
...
}
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks queue. */
callbacks . add ( new Runnable () {
@ Override
public void run () {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
}
});
}
public class RecommendationService {
...
/** Handle defered recommendations based on user loads. */
private void handleLoadFromUserDataService ( final User loadedUser ,
final Callback < List < Recommendation >> recommendationsCallback ) {
/** Add a runnable to the callbacks list. */
callbacks . add (() -> {
List < Recommendation > recommendations = runRulesEngineAgainstUser ( loadedUser );
recommendationsCallback . accept ( recommendations );
});
}そこにある重要な部分は、 UserDataServiceからコールバックコールを受けるたびに、CPU集中的な推奨ルールを実行し、発信者をコールバックすることです。まあ、正確には、私たちがしていることは、コールバックのキューに実行可能であることです。後でそれらを繰り返しますが、いつですか?
RecommendationServiceサービスは、キューが空のときに通知できます。新しいバッチが開始され、バッチ制限に達したときに通知できます。これらはすべて、 UserDataServiceからのコールバックを処理するのに良い時期です。
@ QueueCallback ({
QueueCallbackType . EMPTY ,
QueueCallbackType . START_BATCH ,
QueueCallbackType . LIMIT })
private void handleCallbacks () {
flushServiceProxy ( userDataService );
Runnable runnable = callbacks . poll ();
while ( runnable != null ) {
runnable . run ();
runnable = callbacks . poll ();
}
}クライアントからのより多くの入植リクエストを処理する前に、他のサービスからのコールバックを処理したい別のマイクロサービスからコールバックを処理するときに覚えておくことが重要です。基本的に、あなたは待っているクライアントがいます(非同期待っていますが、それでも)。これらのクライアントはHTTP呼び出しのようなオープンなTCP/IP接続を表すかもしれませんので、より多くのリクエストを処理する前にそれらを閉じることが最善です。ユーザーがユーザーサービスをロードするためのオープン接続を備えています。
コールバックの詳細については、Plesaeは[Qbit Java Microservice lib Callback Fundamentals]([ラフカット] QbitマイクロサービスLIBを使用してコールバックを使用しています)。
public class ServiceWorkers {
public static RoundRobinServiceDispatcher workers () {...
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {...シャード労働者(インメモリ、スレッドセーフ、CPU集中サービスの場合)、またはIOの労働者や外国サービスや外国のバスとの会話を作成できます。
ここに、3人のサービスワーカーがいる労働者プールを使用する例があります。
何かをするサービスがあるとしましょう。
//Your POJO
public class MultiWorker {
void doSomeWork (...) {
...
}
}これでこれはある種のIOを行い、これらの銀行を1つだけでなく実行したいので、並行してIOを行うことができます。いくつかのパフォーマンステストの後、3つがマジックナンバーであることがわかりました。
このサービスにアクセスするためにAPIを使用します。
public interface MultiWorkerClient {
void doSomeWork (...);
}それでは、これらの銀行を作成して使用しましょう。
まず、スレッド/キュー/マイクロバッチを追加するQBITサービスを作成します。
/* Create a service builder. */
final ServiceBuilder serviceBuilder = serviceBuilder ();
/* Create some qbit services. */
final Service service1 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service2 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();
final Service service3 = serviceBuilder . setServiceObject ( new MultiWorker ()). build ();次に、サービスワーカーオブジェクトにそれらを追加します。
ServiceWorkers dispatcher ;
dispatcher = workers (); //Create a round robin service dispatcher
dispatcher . addServices ( service1 , service2 , service3 );
dispatcher . start (); // start up the workersサービス、POJO、メソッド消費者、メソッドディスパッチャーをサービスバンドルに追加できます。サービスバンドルは、QBITへの統合ポイントです。
新しいサービスワーカーを追加しましょう。サービスワーカーはServiceMethoddispatcherです。
/* Add the dispatcher to a service bundle. */
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();おそらく、ヘルパーメソッドをサービスバンドルに追加するため、これのほとんどは1回の呼び出しで発生する可能性があります。
これで、労働者の使用を開始できます。
/* Start using the workers. */
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );これで、SpringまたはGuiceを使用して、ビルダーとサービスバンドルを構成できます。ただし、QBIT内部のテストと理解に適した上記のように、それを行うことができます。
QBITは、CPU(ユーザー推奨エンジンの各CPUコアのルールエンジンを実行する)などのシャーディングに適したシャードサービスの概念もサポートしています。
QBITはあなたのサービスを破壊する方法を知らないので、あなたはそれにヒントを与える必要があります。これをシャードルールで行います。
public interface ShardRule {
int shard ( String methodName , Object [] args , int numWorkers );
}サービスの最初の引数がユーザー名であるアプリに取り組み、それを使用してCPU集中的なインメモリルールエンジンへの呼び出しをシャードしました。この手法は機能します。 :)
ServiceWorkersクラスには、破片のワーカープールを作成する方法があります。
public static ShardedMethodDispatcher shardedWorkers ( final ShardRule shardRule ) {
...
}使用する場合は、サービスワーカーを作成するときにシャードキーを渡すだけです。
dispatcher = shardedWorkers (( methodName , methodArgs , numWorkers ) -> {
String userName = methodArgs [ 0 ]. toString ();
int shardKey = userName . hashCode () % numWorkers ;
return shardKey ;
});次に、サービスワーカーの構成にサービスを追加します。
int workerCount = Runtime . getRuntime (). availableProcessors ();
for ( int index = 0 ; index < workerCount ; index ++) {
final Service service = serviceBuilder
. setServiceObject ( new ContentRulesEngine ()). build ();
dispatcher . addServices ( service );
}次に、以前のようにサービスバンドルに追加します。
dispatcher . start ();
bundle = serviceBundleBuilder (). setAddress ( "/root" ). build ();
bundle . addServiceConsumer ( "/workers" , dispatcher );
bundle . start ();次に、それを使用してください:
final MultiWorkerClient worker = bundle . createLocalProxy ( MultiWorkerClient . class , "/workers" );
for ( int index = 0 ; index < 100 ; index ++) {
String userName = "rickhigh" + index ;
worker . pickSuggestions ( userName );
} public class ServiceWorkers {
...
public static ShardedMethodDispatcher shardOnFirstArgumentWorkers () {
...
}
...
public static ShardedMethodDispatcher shardOnFifthArgumentWorkers () {
...
}
public static ShardedMethodDispatcher shardOnBeanPath ( final String beanPath ) {
...
}ShardonbeanPathを使用すると、複雑なBean Path Navigation Callを作成し、そのプロパティを使用してシャードを使用できます。
/* shard on 2nd arg which is an employee
Use the employees department's id property. */
dispatcher = shardOnBeanPath ( "[1].department.id" );
/* Same as above. */
dispatcher = shardOnBeanPath ( "1/department/id" );サービスのシャードとサービスワーカーの詳細については、こちらをご覧ください
Wikiでもっと多くを見つけることができます。また、コミットに従ってください。私たちは忙しいビーバーでした。 qビットJava -json、rest、websocket用のマイクロサービスライブ。