Those who have read my previous articles may know that I am doing game development. Many of my ideas and starting points are handled according to game ideas, so there may be conflicts with the web and are not in line with it.
Let me tell you why I want to customize the thread model?
According to the mmorpg or mmorpg game I made, the threads are divided into main thread, global synchronization thread, chat thread, team thread, map thread, and map message distribution and delivery thread, etc.
Some columns need to be controlled according to my division and data flow.
The main thing to do in the game server is to accept the player's command request -> corresponding operation -> return the result;
All messages on the server side will be registered in the message manager, and then the thread model will be specified when the message is registered.
If the message needs to be submitted to the player's map thread for processing, when registering the message, you must use the thread model (map message distribution and dispatch thread);
Let’s analyze the thread model first;
Before looking at the thread model code, I first look at my task model
package net.sz.engine.thread;import java.io.Serializable;import org.apache.log4j.Logger;import net.sz.engine.structs.ObjectAttribute;import net.sz.engine.structs.ObjectGlobal;/** * Task Model* * <br> * author Falling Programmer<br> * mail [email protected]<br> * phone 13882122019<br> */public abstract class TaskEvent implements Serializable, Cloneable { private static final Logger log = Logger.getLogger(TaskEvent.class); private static final long serialVersionUID = 4196020659994845804L; //Runtime data private transient final ObjectAttribute runOther = new ObjectAttribute; //Task creation time protected long createTime; //Task unique id protected long taskId; //Canceled task protected boolean cancel = false; public TaskEvent { this.runOther.put("submitTime", System.currentTimeMillis); createTime = System.currentTimeMillis; cancel = false; taskId = ObjectGlobal.getUUID; } public long getCreateTime { return createTime; } public void setCreateTime(long createTime) { this.createTime = createTime; } public long getSubmitTime { return this.runOther.getlongValue("submitTime"); } public ObjectAttribute getRunOther { return runOther; } public boolean isCancel { return cancel; } public void setCancel(boolean cancel) { this.cancel = cancel; } public abstract void run; @Override public Object clone throws CloneNotSupportedException { return super.clone; //To change body of generated methods, choose Tools | Templates. }}
package net.sz.engine.thread;/** * Timer executor* * <br> * author Failed programmer<br> * mail [email protected]<br> * phone 13882122019<br> */public abstract class TimerTaskEvent extends TaskEvent { private static final long serialVersionUID = -8331296295264699207L; /** * Time to start execution*/ protected long startTime; /** * Whether to execute once at the beginning*/ protected boolean startAction; /** * End time*/ protected long endTime; /** * Number of executions*/ protected int actionCount; /** * Interval execution time*/ protected int intervalTime; /** * * @param startTime Specify the start time* @param isStartAction Whether to execute once at the beginning* @param endTime Specify the end time* @param actionCount Specify the number of executions* @param intervalTime Specify the interval time*/ public TimerTaskEvent(long startTime, boolean isStartAction, long endTime, int actionCount, int intervalTime) { super; this.startTime = startTime; this.startAction = isStartAction; this.endTime = endTime; this.actionCount = actionCount; this.intervalTime = intervalTime; } /** * Specify the start execution time of the task* * @param startTime Specify the start time* @param isStartAction Whether it is executed once at the beginning* @param actionCount Specify the number of executions* @param intervalTime Specify the interval time*/ public TimerTaskEvent(long startTime, boolean isStartAction, int actionCount, int intervalTime) { this(startTime, isStartAction, 0, actionCount, intervalTime); } /** * The end time specified is the end time, and the number of executions is not necessarily enough* * @param isStartAction Whether to execute once at the beginning* @param endTime Specify the end time* @param actionCount Specify the number of executions* @param intervalTime Specify the interval time* */ public TimerTaskEvent(boolean isStartAction, long endTime, int actionCount, int intervalTime) { this(0, isStartAction, endTime, actionCount, intervalTime); } /** * Specify the start time and end time* * @param startTime Specify the start time* @param endTime Specify the end time* @param intervalTime Specify the interval time*/ public TimerTaskEvent(long startTime, long endTime, int intervalTime) { this(startTime, false, endTime, -1, intervalTime); } /** * Specified execution times and interval time* * @param actionCount Specify the number of executions* @param intervalTime Specify the interval time*/ public TimerTaskEvent(int actionCount, int intervalTime) { this(0, false, 0, actionCount, intervalTime); } /** * Unlimited execution after submission * @param intervalTime Specified interval time*/ public TimerTaskEvent(int intervalTime) { this(0, false, 0, -1, intervalTime); } public long getStartTime { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } public boolean isStartAction { return startAction; } public void setStartAction(boolean startAction) { this.startAction = startAction; } public long getEndTime { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } public int getActionCount { return actionCount; } public void setActionCount(int actionCount) { this.actionCount = actionCount; } public int getIntervalTime { return intervalTime; } public void setIntervalTime(int intervalTime) { this.intervalTime = intervalTime; }}
Here are the task model and the timer task model;
package net.sz.engine.thread;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ConcurrentLinkedQueue;import net.sz.engine.structs.ObjectGlobal;import net.sz.engine.utils.MailUtil;import net.sz.engine.utils.StringUtil;import org.apache.log4j.Logger;import org.jboss.jandex.Main;/** * Thread Model* <br> * author Failed Programmer<br> * mail [email protected]<br> * phone 13882122019<br> */public class ThreadModel implements Runnable { private static final Logger log = Logger.getLogger(ThreadModel.class); private static long threadID = 0; protected static final Object SYN_OBJECT = new Object; protected long tid; protected String name; protected long lastSendMail = 0; protected final ArrayList<MyThread> threads = new ArrayList<>; /** * Task list thread-safe task list*/ //protected final List<TaskModel> taskQueue = new ArrayList<>; protected final ConcurrentLinkedQueue<TaskEvent> taskQueue = new ConcurrentLinkedQueue<>; /** */ protected final List<TimerTaskEvent> timerQueue = new ArrayList<>; // false identity delete thread protected volatile boolean running = true; public ThreadModel(ThreadGroup group) { this(group, "No name", 1); } public ThreadModel(String name) { this(ThreadPool.UnknownThreadGroup, name, 1); } public ThreadModel(ThreadGroup group, String name, int threadCount) { this(group, name, threadCount, null); } public ThreadModel(ThreadGroup group, String name, int threadCount, Runnable runnable) { synchronized (SYN_OBJECT) {threadID++;tid = threadID; } for (int i = 1; i <= threadCount; i++) { MyThread thread;if (runnable == null) {thread = new MyThread(tid, group, this, name + "-" + tid + "-" + i);} else {thread = new MyThread(tid, group, runnable, name + "-" + tid + "-" + i); } thread.start; threads.add(thread); } this.name = name; } /** * Thread name* * @return */ public String getName { return name; } /** * Get the custom id of the thread * * @return */ public long getId { return this.tid; } /** * Add a new task for each new task, the task queue must be awakened* * @param runnable */ public void addTask(TaskEvent runnable) { taskQueue.add(runnable); synchronized (taskQueue) {/* Wake up the queue and start executing */ taskQueue.notifyAll; } } /** * Add a timer task to the thread* * @param runnable */ public void addTimer(TimerTaskEvent runnable) { synchronized (timerQueue) { if (runing) {//Execute once at the beginning if (runnable.startAction) { addTask(runnable); } timerQueue.add(runnable);} else {log.error("Thread has stopped"); } } } } // <editor-fold defaultstate="collapsed" desc="Timer thread executor public void timerRun"> /** * Timer thread executor*/ public void timerRun { ArrayList<TimerTaskEvent> taskModels; synchronized (timerQueue) {// If the queue is not empty, take out the queue timer taskModels = new ArrayList<>(timerQueue); } if (!taskModels.isEmpty) {for (TimerTaskEvent timerEvent : taskModels) {int execCount = timerEvent.getRunOther.gettinValue("Execcount"); long lastTime = timerEvent.getRunOther.getlongValue("LastExecTime"); long nowTime = System.currentTimeMillis;if (lastTime == 0) {timerEvent.getRunOther.put("LastExecTime", nowTime);} else if (timerEvent.isCancel) {//If the task has been canceled synchronized (timerQueue) { timerQueue.remove(timerEvent); }log.debug("Clean the timer task: " + timerEvent.getClass.getName);} else if (nowTime > timerEvent.getStartTime // Whether the start time is satisfied&& (nowTime - timerEvent.getSubmitTime > timerEvent.getIntervalTime)// Whether the interval time is satisfied after submission&& (timerEvent.getEndTime <= 0 || nowTime < timerEvent.getEndTime) // Determine the end time&& (nowTime - lastTime >= timerEvent.getIntervalTime)) // Determine whether the interval time has been satisfied since the last execution {// The submission execution timer is the first to execute this.addTask(timerEvent);// Record execCount++;timerEvent.getRunOther.put("Execcount", execCount);timerEvent.getRunOther.put("LastExecTime", nowTime);nowTime = System.currentTimeMillis;// Determine the deletion condition if ((timerEvent.getEndTime > 0 && nowTime < timerEvent.getEndTime)|| (timerEvent.getActionCount > 0 && timerEvent.getActionCount <= execCount)) {synchronized (timerQueue) { timerQueue.remove(timerEvent); }log.debug("Cleaning timer task:" + timerEvent.getClass.getName); } } } } } } } // </editor-fold> // <editor-fold defaultstate="collapsed" desc="View thread stack public void showStackTrace"> /** * * View thread stack*/ public void showStackTrace { StringBuilder buf = new StringBuilder; for (MyThread currentThread : threads) {long procc = System.currentTimeMillis - currentThread.getLastExecuteTime;if (procc > 5 * 1000 && procc < 86400000L) {//less than 10 days//Because the multithreaded operation time may be inaccurate buf.append("thread[") .append(currentThread.getName).append("] may be stuck-> ").append(procc / 1000f).append("s/n ").append("Execute task:") .append(currentThread.getLastCommand.getClass.getName);try {StackTraceElement elements = currentThread.getStackTrace;for (int i = 0; i < elements.length; i++) {buf.append("/n ") .append(elements[i].getClassName).append(".") .append(elements[i].getMethodName).append("(").append(elements[i].getFileName).append(";").append(elements[i].getLineNumber).append(")"); }} catch (Exception e) { buf.append(e); }buf.append("/n++++++++++++++++++++++++++++++++++"); } } String toString = buf.toString; if (!StringUtil.isNullOrEmpty(toString)) { log.error(toString);if (System.currentTimeMillis - lastSendMail > 5 * 60 * 1000) {lastSendMail = System.currentTimeMillis;MailUtil.sendMail("Thread execution is stuck->Gameid-" + ObjectGlobal.GameID + "Platform-" + ObjectGlobal.Platform + "Server id-" + ObjectGlobal.ServerID, toString); } } } // </editor-fold> @Override public void run { MyThread currentThread = (MyThread) Thread.currentThread; while (runing) {while (taskQueue.isEmpty && running) {try {/* The task queue is empty, then a new task is waiting for a new task to join and be woken up*/synchronized (taskQueue) {taskQueue.wait(500); }} catch (InterruptedException ie) { log.error(ie); } }/* Get out the task to execute*/if (runing) {currentThread.lastCommand = null;currentThread.lastCommand = taskQueue.poll; }if (currentThread.lastCommand != null) {if (currentThread.lastCommand.isCancel) {//If the task has been canceled continue; }/* Execute the task*/// r.setSubmitTimeL;currentThread.lastExecuteTime = System.currentTimeMillis;try { currentThread.lastCommand.run;} catch (Exception e) {log.error("Worker<"" + currentThread.getName + ""> Execute the task<" + currentThread.lastCommand.getClass.getName + "> An error was encountered: ", e); } long timeL1 = System.currentTimeMillis - currentThread.lastExecuteTime; if (timeL1 <= 20) {} else if (timeL1 <= 100L) {log.info("Worker<"" + currentThread.getName + ""> Completed the task: " + currentThread.lastCommand.toString + " Execution time: " + timeL1);} else if (timeL1 <= 200L) {log.info("Worker<"" + currentThread.getName + ""> Long-term execution of completion of task: " + currentThread.lastCommand.toString + " "Consider" the task script logic time-consuming: " + timeL1);} else {log.info("Worker<"" + currentThread.getName + ""> Execute completion of task for a long time: " + currentThread.lastCommand.toString + " "Consider whether it should be deleted" the task script time-consuming: " + timeL1); }currentThread.lastExecuteTime = 0; } } log.error("Thread ends, worker<"" + Thread.currentThread.getName + "">Exit"); } /** * Custom thread*/ public class MyThread extends Thread { /** * * @param tid Custom thread id * @param group Group* @param run Execution method* @param name Thread name*/ public MyThread(long tid, ThreadGroup group, Runnable run, String name) {super(group, run, name);this._id = tid; } // Custom id of thread public long _id; // Execution task public volatile TaskEvent lastCommand; // Time to start executing the task public volatile long lastExecuteTime = 0; public TaskEvent getLastCommand {return lastCommand; } public long getLastExecuteTime {return lastExecuteTime; } /** * Return thread custom id * * @return*/ @Override public long getId {return _id; } } /** * Stop the thread, set the thread's stop state, and will not terminate the thread immediately*/ public void stop { this.runing = false; } public boolean isRuning { return running; } @Override public String toString { return "Thread{" + "tid=" + tid + ",Name=" + this.getName + '}'; }}
I constructed from ThreadModel
public ThreadModel(ThreadGroup group, String name, int threadCount, Runnable runnable) { synchronized (SYN_OBJECT) {threadID++;tid = threadID; } for (int i = 1; i <= threadCount; i++) { MyThread thread;if (runnable == null) {thread = new MyThread(tid, group, this, name + "-" + tid + "-" + i);} else {thread = new MyThread(tid, group, runnable, name + "-" + tid + "-" + i); } thread.start; threads.add(thread); } this.name = name; }As can be seen, here I run the MyThread thread class declared
Why do I think about this? For example, if I am processing log writing data, such as no shared data, and no thread critical area processing flow, I can consider using N threads to process such work; it will not produce dirty data;
If I want to team up and request skills casting, I need to process a single queue, then there must be only one MyThread in threadmodel. This does not count as blocking mode serial execution (or queue execution) to solve the problem of sharing data and thread critical area, which no longer depends on locks;
I'm very ugly, please forgive me
As shown in the picture above, there will be two queues in each threadmodel, one timetaskevent and the other taskvent, and there will be a global timer thread;
The function of the global timer thread is to process and find that the timetaskevent in the threadmodel needs to be executed, so it is added to the taskvent team; the final execution is the taskvent queue
Why should timetaskevent be stored in the corresponding threadmodel? That is because, for example, after my thread A (threadmodel instance) runs for a period of time, I need to close and free up resources, so I have to go to other places to find the corresponding timetask and remove it;
package net.sz.engine.thread;import java.util.HashMap;import java.util.Map;/** * * <br> * author Falling programmer<br> * mail [email protected]<br> * phone 13882122019<br> */class TimerThread extends Thread { private static final Object SYN_OBJECT = new Object; public TimerThread { super(ThreadPool.GloblThreadGroup, "Global Timer Thread"); } @Override public void run { while (true) {synchronized (SYN_OBJECT) {try {SYN_OBJECT.wait(2);} catch (InterruptedException ex) { } }HashMap<Long, ThreadModel> hashMap = new HashMap<>(ThreadPool.getThreadMap);for (Map.Entry<Long, ThreadModel> entrySet : hashMap.entrySet) {Long key = entrySet.getKey;ThreadModel value = entrySet.getValue; value.timerRun; } } }}
Thread model manager
package net.sz.engine.thread;import java.util.HashMap;import java.util.concurrent.ConcurrentHashMap;import net.sz.engine.script.manager.ScriptManager;import net.sz.engine.timer.GlobTimerEvent;import net.sz.engine.timer.PrintlnServerMemoryTimerEvent;import org.apache.log4j.Logger;/** * Thread Manager* * <br> * author Failed Programmer<br> * mail [email protected]<br> * phone 13882122019<br> */public class ThreadPool { static private final Logger log = Logger.getLogger(ThreadPool.class); static public final long GloblThread; static private final TimerThreadGloblTimerThread; static final long CheckThreadTimerThreadModel; static public final ThreadGroup GloblThreadGroup = new ThreadGroup("Global ThreadGroup"); static public final ThreadGroup UnknownThreadGroup = new ThreadGroup(GloblThreadGroup, "Unknown ThreadGroup"); static private final ConcurrentHashMap<Long, ThreadModel> threadMap = new ConcurrentHashMap<>; public static void main(String[] args) { ThreadPool.addTimerTask(GloblThread, new TimerTaskEvent(1000) { @Overridepublic void run {log.error("sssss"); } }); } static { //Create the global thread GloblThread = addThreadModel(GloblThreadGroup, "GloblThread"); //Execute the specified task timing to trigger the footsteps addTimerTask(GloblThread, new GlobTimerEvent(ScriptManager.getInstance.getBaseScriptEntry)); //Query the server consumption timing model addTimerTask(GloblThread, new PrintlnServerMemoryTimerEvent); //Create the timer thread GloblTimerThread = new TimerThread; GloblTimerThread.start; //Check the thread stuck CheckThreadTimerThreadModel = addThreadModel(GloblThreadGroup, "Check ThreadTimer Event"); addTimerTask(CheckThreadTimerThreadModel, new CheckThreadTimerEvent); } /** * When deleting the specified id thread model, set the status to the stop state* * @param tid * @return */ static public ThreadModel remove(long tid) { ThreadModel remove = threadMap.remove(tid); if (remove != null) { remove.stop; } return remove; } /** * Get all threads in the thread pool* * @return */ static public ConcurrentHashMap<Long, ThreadModel> getThreadMap { return threadMap; } /** * Get a thread in the thread pool* * @param threadId * @return */ static public ThreadModel getThreadModel(long threadId) { ThreadModel get = threadMap.get(threadId); if (get == null) {log.error("The thread model cannot be found:" + threadId, new Exception("Thread model cannot be found: " + threadId)); } return get; } /** * Register a thread to the thread pool* <br> * Default grouping UnknownThreadGroup * * @param name Thread name* @return */ static public long addThreadModel(String name) { return addThreadModel(UnknownThreadGroup, name); } /** * Register a thread to the thread pool* <br> * Default grouping UnknownThreadGroup * * @param name Threadname* @param threadcount Threadcount * @return */ static public long addThreadModel(String name, int threadcount) { return addThreadModel(UnknownThreadGroup, name, threadcount); } /** * Register a thread with the thread pool* * @param group Thread grouping information* @param name Thread name* @return */ static public long addThreadModel(ThreadGroup group, String name) { return addThreadModel(group, name, 1); } /** * Register a thread with the thread pool* * @param group Thread grouping information* @param name Threadname* @param threadcount Threadcount* @return */ static public long addThreadModel(ThreadGroup group, String name, int threadcount) { return addThreadModel(group, name, null, threadcount); } /** * Register a thread with the thread pool* * @param group Threadgroup information* @param name Threadname* @param runnable * @param threadcount Threadcount* @return */ static public long addThreadModel(ThreadGroup group, String name, Runnable runnable, int threadcount) { ThreadModel threadModel = new ThreadModel(group, name, threadcount, runnable); return addThreadModel(threadModel); } /** * Register a thread with the thread pool* * @param threadModel */ static public long addThreadModel(ThreadModel threadModel) { threadMap.put(threadModel.getId, threadModel); return threadModel.getId; } /** * Add task* * @param threadId * @param task * @return */ static public boolean addTask(long threadId, TaskEvent task) { ThreadModel threadModel = getThreadModel(threadId); if (threadModel != null) { threadModel.addTask(task);return true; } return false; } /** * Add timer task* * @param threadId * @param task * @return */ static public boolean addTimerTask(long threadId, TimerTaskEvent task) { ThreadModel threadModel = getThreadModel(threadId); if (threadModel != null) { threadModel.addTimer(task);return true; } return false; } /** * Add task, add task to the current thread* * @param task * @return */ static public boolean addCurrentThreadTask(TaskEvent task) { Thread currentThread = Thread.currentThread; if (currentThread instance of ThreadModel.MyThread) {long threadId = currentThread.getId;ThreadModel threadModel = getThreadModel(threadId);if (threadModel != null) { threadModel.addTask(task);return true; } } return false; } /** * Add a timer task and add a task to the current thread* * @param task * @return */ static public boolean addCurrentThreadTimerTask(TimerTaskEvent task) { Thread currentThread = Thread.currentThread; if (currentThread instance of ThreadModel.MyThread) { long threadId = currentThread.getId;ThreadModel threadModel = getThreadModel(threadId);if (threadModel != null) { threadModel.addTimer(task);return true; } } return false; }}
Next, let's take a look at the usage
Thread introduction code in the previous article
public static void main(String[] args) throws InterruptedException { //Thread parallelism, multiple threads execute multiple tasks/functions new Thread(new Run1).start; new Thread(new Run2).start; } //Task1 static class Run1 implements Runnable { @Override public void run {//Execute task 1 run1;//Execute task 3 run3; } } //Task2 static class Run2 implements Runnable { @Override public void run {//Execute task 3 run3;//Execute task 1 run1;//Execute task 2 run2; } } //Task 1 public static void run1 { System.out.println("run1->" + System.currentTimeMillis); } //Task 2 public static void run2 { System.out.println("run2->" + System.currentTimeMillis); } //Task 3 public static void run3 { System.out.println("run3->" + System.currentTimeMillis); }I switched the code to the mode
public static void main(String[] args) throws InterruptedException { //Threads are parallel, multiple threads execute multiple tasks/function long test1 = ThreadPool.addThreadModel("test thread-1"); long test2 = ThreadPool.addThreadModel("test thread-2"); //Add task ThreadPool.addTask(test1, new Run1); ThreadPool.addTask(test2, new Run2); //Add timer task ThreadPool.addTimerTask(test1, new TimerRun1); ThreadPool.addTimerTask(test2, new TimerRun2); } //Task1 static class Run1 extends TaskEvent { @Override public void run {//Execute task 1 run1;//Execute task 3 run3; } } //Task1 static class TimerRun1 extends TimerTaskEvent { public TimerRun1 {super(500);//500ms unlimited execution} @Override public void run {//Execute task 1 run1;//Execute task 3 run3; } } //Task2 static class Run2 extends TaskEvent { @Override public void run {//Execute task 3 run3;//Execute task 1 run1;//Execute task 2 run2; } } //Task2 static class TimerRun2 extends TimerTaskEvent { public TimerRun2 {super(500);//500ms unlimited execution} @Override public void run {//Execute task 3 run3;//Execute task 1 run1;//Execute task 2 run2; } } //Task1 public static void run1 { System.out.println("run1->" + System.currentTimeMillis); } //Task2 public static void run2 { System.out.println("run2->" + System.currentTimeMillis); } //Task 3 public static void run3 { System.out.println("run3->" + System.currentTimeMillis); }Next, let's take a look at the execution effect
run1->1472120543013run3->1472120543013run3->1472120543017run1->1472120543017run2->1472120543017run1->1472120543517run3->1472120543517run2->1472120543517run1->1472120544018run3->1477 2120544018run2->1472120544018run1->1472120544520run3->1472120544520run2->1472120544520run1->1472120545021run3->1472120545021run2->1472120545021run1->1472120545521run3->1472120545521
Everything is normal;
This is my custom threading model;
At this point, my custom threading model has been completed;
So what are the advantages and disadvantages?
The advantage is that the data flow control is very clear, including the current execution situation, as well as threads stuck monitoring and task timer execution;
Disadvantages, this custom thread model still cannot solve the problem of thread data security and critical area, and it still needs to be solved by locks or other forms at the appropriate time;
I hope the great gods will point out the shortcomings, so I can correct them immediately.