What is real -time calculation?
Please see the picture below:
Take the statistics of hot products as an example to see the traditional calculation methods:
1 Save the user behavior, LOG and other information in the database.
2 Save the order information in the database.
3 Use trigger or coroutine to establish local indexes, or remote independent indexes.
4JOIN order information, order details, user information, product information, etc., aggregate the product within 20 minutes, and return to TOP-10.
5web or app display.
This is an imaginary scene, but assuming that you have experience in dealing with similar scenes, you should experience such problems and difficulties:
1. Horizontal expansion problem (Scale-OUT)
Obviously, if it is an e -commerce website with a certain scale, the amount of data is very large. Because the transaction information involves transactions, it is difficult to directly abandon the transaction ability of the relationship database and migrate to the NOSQL database with better Scale-OUT capabilities.
Well, it is generally done. Fortunately, we can archive by the date and cache the results by batch processing offline computing.
However, the requirements here are within 20 minutes, which is difficult.
2. Performance issues <br /> This problem is consistent with Scale-OUT. Assuming we have done Sharding, because the table is scattered in each node, so we need to enter the warehouse multiple times and make aggregation calculations on the business layer.
The question is, how many times do we need to enter the warehouse?
What about 10 minutes?
What about 5 minutes?
What about real time?
In addition, the business layer also faces the limitations of single -point computing power and requires horizontal expansion, so it is necessary to consider the problem of consistency.
Therefore, everything is very complicated here.
3. Business expansion issues <br /> Assuming we must not only handle the statistics of hot selling goods, but also statistical advertising click, or quickly determine the user's characteristics based on the user's access behavior to adjust the information they see Wait, the business layer will be more complicated.
Maybe you have a better way, but in fact, what we need is a new cognition:
What happened in this world was real -time.
So we need a model that is calculated in real time, not a batch processing model.
This model we need must be able to process a lot of data, so it is best to have a good Scale-OUT ability. It is best that we don't need to consider too much consistency and replication.
Then, this computing model is a real -time calculation model, which can also be regarded as a streaming computing model.
Now assuming that we have such a model, we can happily design new business scenarios:
What is the most forwarded Weibo?
What are the hottest products?
What are the hot spots that everyone is searching?
Which advertisement, which position, is the most clicked?
Or, we can ask:
What happened in this world?
What is the hottest Weibo topic?
We use a simple sliding window count to unveil the mysterious veil of the so -called real -time calculation.
Assume that our business requirements are:
Statistics 10 hottest Weibo topics in 20 minutes.
To solve this problem, we need to consider:
1. Data source <br /> Here, assuming our data, the topic from Weibo long connection push.
2. Problem modeling
The topic we think is the expansion of the#number. The hottest topic is that this topic has appeared more times than other topics.
For example: @Foreach_break: Hello,#比#, I love you,#Weibo#.
"World" and "Weibo" are topics.
3. Calculating engine
We use storm.
4. Define time
How to define time?
The definition of time is a difficult thing, depending on what the accuracy is required.
According to reality, we generally use Tick to represent this concept.
In Storm's infrastructure, the EXECUTOR startup phase uses the timer to trigger the event "after a period of time".
As shown below:
(DEFN SETUP-TICKS! [Worker Executor-Data] Ive-Queue ( : Receive-Queue Executor-Data) Context (: Worker-Context Executor-Data)] (When Tick-Time-SECS (or (System-Id? (: Component-Id Executor-Data))) alse (Storm-cONF TOPOLOGY-ENable-Message-Timeouts) (=: Spout (: Type Executor-Data))) (Log-Message "Timeouts Disabled for Executor" (: Component-Id Exec UTOR-DATA) ":" ("(" ( : Executor-ID Executor-Data) (Schedule-ReCurring (: User-Timer Worker) Tick-Time-secs Tick-Time-secs (Fn [] (DISRUPTOR/PUBLISH Receive-Queue [[Nil (Tupleimpl. Context [tick -Time-secs] Constants/System_Task_id Constants/System_Tick_Stream_id)]))))))))))))))))),),)Every time, such an event will be triggered. When the Bolt of the underwriting downstream receives such an event, you can choose whether to increase the result or send the results to the flow.
How does Bolt judge that the received Tuple represents "tick"?
Responsible for managing Bolt's Executor thread. When the information queue consumption message is subscribed to the bolt, it will be called to the Execute method of Bolt. Then, you can judge in Execute:
Public Static Boolean Isstick (Tuple Tuple) {Return Tuple! = NULL && Constants.system_Component_id .equals Ck_stream_id.equals (tuple.getsourcestreamid ());} Combined with the CLOJURE code of Setup-Tick! We can know the System_tick_stream_id to pass to Tuple in the callback of the time event. So how did the system_component_id come?
It can be seen that in the following code, system_task_id also passed to Tuple:
;
(Tupleimpl. Context [tick-time-secs] constants/system_task_id constants/system_tick_stream_id))
Then use the following code to get the system_component_id:
Public String GetComponentid (int Taskid) {if (taskid == Constants.System_id_id) {Return Constants.system_Component_id; onnt.get (taskid);}}With the infrastructure above <br />, we also need some means to complete the "engineering" and turn the idea into reality.
Here, let's look at the sliding window design of Michael G. Noll.
Topology
String spoutid = "wordgenrator"; string counterid = "counter"; string intermediatorankerid = "intermediatorank"; string Totalrankerid = "FinalRanker"; // Here, Assuming that TestwordSpout is the source of the topic of Tuple. ), 5); // RollingCountbolt's time window is 9 seconds, and the statistical results are sent every 3 seconds to the downstream builter.Setbolt (countrid, new rollingCountbolt (9, 3), 4) .Fieldsgrouping (Spoutid, NEW Ields ("" word ")); // intermediatorankingsbolt, part of the aggregation will be completed, counting the top-n topic Builder.Setbolt (intermediatorankerid, new intermediaaticRankingsbolt (top_n), 4). Fieldsgrouping (Count, new fields ("obj"); / / Totalrankingsbolt will complete the complete aggregation and count the top-n topic Builder.Setbolt (TotalRankerid, New Totalrankingsbolt (TOP_N)). GlobalGrouping (intermediaaticArid);The top design above is as follows:
Combining the aggregation calculation with time
Earlier, we described the Tick incident, which will trigger the bolt's Execute method during the callback, which can be done:
RollingCountbolt:
@Override Public Void Execute (Tuple Tuple) {if (TPLEUTILS.ISTICK (TUPLE)) {log.debug ("Received Tick Tuple, TriggEy Emit of Current Windows "); // Tick is here, the statistics in the time window Results Send it and let the window scroll emitcurrentwindowcounts ();} Else {// Conventional tuple, and the topic count can countable (tuple);} // obj is the topic, add a counting ++ // pay attention, the speed here is basically basically basic here. Depending on the speed of flow, it may be one million yuan per second, or it may be tens of tens per second. // Insufficient memory? BOLT can be scale-out. Prive Void Countobjandack (Tuple Tuple) {Object Obj = Tuple.GetValue (0); Countr.incrementCount (OBJ); Collector.ack (Tuple); // Send the statistical results to the downstream Private Void EmitcurrentwindowCounts () {Map <object, long> Counts = CountcountsthenAdvancewindow (); int ActualWindowlenginSeconds = LastModiFiedRacker.seCondSINCELDESTMOD iFication (); LastModifiedtracker.markasmodify (); if (ActualWindowlengThinSeconds! = WindowlengThinSeconds) {LOG.WARN (String.Format (Window_lengtt H_Warning_template, ActualWindowLengThinSeconds, WindowLengthinSeconds);} Emit (Counts, ActualWindowLengthinSeconds);}The code above may be a bit abstract. If you look at this picture, you can understand. As soon as the tick arrives, the window rolls:
IntermedianKingsbolt & Totalrankingsbolt:
Public Final Void Execute (Tuple Tuple, BasicoutPutCollector Collector) {if (Tupleutils.istick (Tuple)) {GetLogger (). GGREING EMIT of Current Rankings "); // Send the results of the polymerization sorting To the downstream Emitrankings (Collector);} Else {// Polytes and sort up UPDATERANKINGSWITHTUPLE (Tuple);}}Among them, the aggregate sorting method of Internet and Totalrankingsbolt is slightly different:
Intermediatorankingsbolt aggregate sorting method:
// intermediatorankingsbolt The aggregate sorting method: @Override void updateankingswithtuple (Tuple Tuple) {// This step is to extract the number of topics and topics. elds.from (tuple); // This step, the topic will appear The number of times is aggregated, and then all the topics shaper.getrankings (). Updatewith (rankable);}The aggregate sorting method of Totalrankingsbolt:
// Totalrankingsbolt's aggregate sorting method @Override Void UpdaterankingswithTuple (TPLE TUPLE) {// propose the middle result of the intermediate result of the intermediate of Internet. s) Tuple.getValue (0); // Advantages of co -sort super.Getrankings (). Updatewith ( RankingStobemerged); // Go to 0, save memory super.getrankings (). PruneZerocounts ();}The method of heavy sorting is relatively simple and rude, because only N, N will not be very large:
Private void () {Collections.sort (RANKEDITEMS); Collections.reverse (RANKEDITEMS);} Conclusion
The figure below may be the result we want. We have completed the hot topic statistics between the T0 -T1 moment. The foreach_break is just to prevent piracy:]..
The above is all the contents of this article. I hope everyone likes it.