リアルタイム計算とは何ですか?
以下の写真をご覧ください:
従来の計算方法を確認するための例として、ホット製品の統計を取得してください。
1データベースにユーザーの動作、ログ、その他の情報を保存します。
2データベースに注文情報を保存します。
3トリガーまたはコルーチンを使用して、ローカルインデックス、またはリモート独立したインデックスを確立します。
4ジョイン注文情報、注文の詳細、ユーザー情報、製品情報など、製品を20分以内に集約し、トップ10に戻ります。
5WEBまたはアプリディスプレイ。
これは想像上のシーンですが、同様のシーンに対処する経験があると仮定すると、そのような問題や困難を経験する必要があります。
1。水平拡張の問題(スケールアウト)
明らかに、特定のスケールのEコマースWebサイトの場合、データの量は非常に大きいです。トランザクション情報にはトランザクションが含まれるため、関係データベースのトランザクション能力を直接放棄し、より良いスケールアウト機能を備えたNOSQLデータベースに移行することは困難です。
まあ、それは一般的に行われます。幸いなことに、オフラインコンピューティングをバッチ処理することにより、日付でアーカイブして結果をキャッシュできます。
ただし、ここでの要件は20分以内であり、これは困難です。
2。パフォーマンスの問題<BR />この問題は、各ノードに散らばっているため、シェルディングを行ったと仮定しています。
問題は、倉庫に何回入る必要があるかということです。
10分はどうですか?
5分はどうですか?
リアルタイムはどうですか?
さらに、ビジネス層は単一ポイントコンピューティングパワーの制限にも直面しており、水平方向の拡張が必要なため、一貫性の問題を考慮する必要があります。
したがって、ここではすべてが非常に複雑です。
3.ビジネス拡張の問題<BR />ホット販売商品の統計を処理するだけでなく、統計的な広告のクリックを処理する必要があると仮定するか、ユーザーのアクセス動作に基づいてユーザーの特性を迅速に決定して、待機する情報を調整します。レイヤーはより複雑になります。
たぶんあなたはより良い方法を持っているかもしれませんが、実際、私たちが必要とするのは新しい認知です:
この世界で起こったことは本物でした - タイム。
そのため、バッチ処理モデルではなく、リアルタイムで計算されるモデルが必要です。
このモデルは、多くのデータを処理できる必要があるため、優れたスケールアウト機能を考慮する必要があります。
次に、このコンピューティングモデルは実際の時間計算モデルであり、ストリーミングコンピューティングモデルとも見なすことができます。
このようなモデルがあると仮定すると、新しいビジネスシナリオを喜んで設計できます。
最も転送されたワイボは何ですか?
最もホットな製品は何ですか?
誰もが探しているホットスポットは何ですか?
どの広告、どのポジションが最もクリックされていますか?
または、尋ねることができます:
この世界で何が起こったのですか?
最もホットなワイボのトピックは何ですか?
単純なスライドウィンドウカウントを使用して、SO -Caled Real -Time計算の神秘的なベールを明らかにします。
当社のビジネス要件は次のとおりです。
統計10 20分で最もホットなWeiboトピック。
この問題を解決するには、考慮する必要があります。
1。データソース<br />ここで、私たちのデータを想定して、Weibo Long Connection Pushからのトピック。
2。問題モデリング
私たちが考えるトピックは、#数の拡張です。
例:@foreach_break:こんにちは、##、私はあなたを愛しています、#weibo#。
「World」と「Weibo」はトピックです。
3。エンジンの計算
ストームを使用します。
4。時間を定義します
時間を定義する方法は?
時間の定義は、精度が必要なものに応じて、難しいことです。
現実によれば、私たちは通常、この概念を表すためにティックを使用しています。
Stormのインフラストラクチャでは、エグゼキューターのスタートアップフェーズでは、タイマーを使用して「一定期間後」にイベントをトリガーします。
以下に示すように:
(defn setup-ticks![worker executor-data] ive-queue(:receid-queue executor-data)コンテキスト(:worker-context executor-data)]] :component-id executor-data)))))alse(storm-confトポロジ - エクスポリット - メッサージタイムアウト)(=:spout(:type executor-data))))) id exec utor-data) ":"( "("( "(:executor-id executor-data)(schedule-recurring(:user-timer worker)tick-time-secs tick-time-secs(fn [](disruptor/ cublise-queue [[nil(tupleimpl。context[tick-time-secs] constants/system_task_id constants/system_tick_stream_stream_stream_stream_stream_stream_st))))))))))))))、)、)、)、)
そのようなイベントがトリガーされるたびに、下流の下流のボルトがそのようなイベントを受け取ると、結果を増やすか、結果を流れるかを選択できます。
ボルトは、受け取ったタプルが「ダニ」を表しているとどのように判断しますか?
ボルトのエグゼクティブスレッドの管理を担当します。
public static boolean isstick(tuple tuple){retuple! Clojure Code of Setup-Tickと組み合わせてください。
次のコードでは、System_Task_IDがTupleに渡されたことがわかります。
;
(tupleimpl。context[tick-time-secs] constants/system_task_id constants/system_tick_stream_id)))
次に、次のコードを使用して、system_component_idを取得します。
public string getComponentID(int taskid){if(taskid == constants.system_id_id){return constants.system_component_id;}}上記のインフラストラクチャを使用すると、「エンジニアリング」を完了し、アイデアを現実に変えるための手段も必要です。
ここでは、Michael G. Nollのスライディングウィンドウデザインを見てみましょう。
トポロジー
string spoutid = "counterid =" Intermediatorank = "finalranker"; / RollingCountboltのタイムウィンドウは9秒で、統計結果は3秒ごとに下流のBuilter.setBolt(Countrid、New RollingCountbolt(9、3)、4).FieldSgrouping(Spoutid、New Ields( "" word "))に送信されます。 ; //中間装置の一部は、TOP-N TOPIC BUILDER.SETBOLT(新しいIntermediaRankingsbolt(TOP_N)、4)をカウントします完全な集約を完了し、TOP-NトピックBuilder.setBolt(TotalRankerid、新しいTotalRankingsbolt(TOP_N))をカウントします。
上記のトップデザインは次のとおりです。
集約計算を時間と組み合わせます
以前には、コールバック中にボルトの実行方法をトリガーするティックインシデントについて説明しました。
RollingCountbolt:
@Override public void execute(tuple tuple){if(tpleutils.istick(tuple)){log.debug( "ティックタプル、現在のウィンドウのトリガイエミット");それを送信して、ウィンドウをemitcurrentwindowcounts();} else {//従来のタプルをスクロールします。基本的にここでは基本的です。ボルトはスケールアウトすることができます。 EmitCurrentWindowCounts(){Map <Objects = CountCountSthenadvanceWindow(); format(window_lengtt h_warning_template 、actallwindowlengthinseconds、windowlengthinseconds);} emit(counts、actualwindowlengthinseconds);}上記のコードは少し抽象的かもしれません。
intermediankingsbolt&totalrankingsbolt:
Public final void execute(tuple tuple、basicoutputcollectorコレクター){if(tupleutils.istick(tuple)){getlogger(); ;} else {// polytes and sort up updaterankingswithtuple(tuple);}}その中で、インターネットとTotalRankingsboltの集計ソート方法はわずかに異なります。
Mettermediatorankingsbolt Aggregateソートメソッド:
//総合的なソートメソッド:@Override updateankingswithtuple(タプルタプル){//トピックとトピックの数を抽出することです。時間は集約され、その後、すべてのトピックがshaper.getrankings()(ランク付け可能);}。TotalRankingsboltの集計ソートメソッド:
// TotalRankingSolteのソートメソッド@Override updaterankingswithtuple(tple tuple){//インターネットの中間結果の中間結果を提案します()。n、nのみがそれほど大きくないため、重いソートの方法は比較的単純で失礼です。
private void(){collections.sort(rankeditems);結論
以下の図は、T0T1モーメントの間のホットトピック統計を完了しました。
上記は、この記事のすべての内容です。