看過我之前文章的園友可能知道我是做遊戲開發,我的很多思路和出發點是按照遊戲思路來處理的,所以和web的話可能會有衝突,不相符合。
來說說為啥我要自定義線程模型呢?
按照我做的mmorpg或者mmoarpg遊戲劃分,線程被劃分為,主線程,全局同步線程,聊天線程,組隊線程,地圖線程,以及地圖消息分發派送線程等;
一些列,都需要根據我的劃分,以及數據流向做控制。
遊戲服務器,主要要做的事情,肯定是接受玩家的命令請求-> 相應的操作-> 返回結果;
在服務器端所有的消息都會註冊到消息管理器裡,然後消息在註冊的時候會指定線程模型,
如果消息需要提交到玩家所在地圖線程進行處理的話註冊消息的時候就要把線程模型用(地圖消息分發派送線程);
下面我們先來分析線程模型;
在看線程模型代碼之前我先看看我的任務模型
package net.sz.engine.thread;import java.io.Serializable;import org.apache.log4j.Logger;import net.sz.engine.structs.ObjectAttribute;import net.sz.engine.structs.ObjectGlobal;/** * 任務模型* * <br> * author 失足程序員<br> * mail [email protected]<br> * phone 13882122019<br> */public abstract class TaskEvent implements Serializable, Cloneable { private static final Logger log = Logger.getLogger(TaskEvent.class); private static final long serialVersionUID = 4196020659994845804L; //運行時數據private transient final ObjectAttribute runOther = new ObjectAttribute; //任務創建的時間protected long createTime; //任務的唯一id protected long taskId; //取消的任務protected boolean cancel = false; public TaskEvent { this.runOther.put("submitTime", System.currentTimeMillis); createTime = System.currentTimeMillis; cancel = false; taskId = ObjectGlobal.getUUID; } public long getCreateTime { return createTime; } public void setCreateTime(long createTime) { this.createTime = createTime; } public long getSubmitTime { return this.runOther.getlongValue("submitTime"); } public ObjectAttribute getRunOther { return runOther; } public boolean isCancel { return cancel; } public void setCancel(boolean cancel) { this.cancel = cancel; } public abstract void run; @Override public Object clone throws CloneNotSupportedException { return super.clone; //To change body of generated methods, choose Tools | Templates. }}
package net.sz.engine.thread;/** * 定時器執行器* * <br> * author 失足程序員<br> * mail [email protected]<br> * phone 13882122019<br> */public abstract class TimerTaskEvent extends TaskEvent { private static final long serialVersionUID = -8331296295264699207L; /** * 開始執行的時間*/ protected long startTime; /** * 是否一開始執行一次*/ protected boolean startAction; /** * 結束時間*/ protected long endTime; /** * 執行次數*/ protected int actionCount; /** * 間隔執行時間*/ protected int intervalTime; /** * * @param startTime 指定開始時間* @param isStartAction 是否一開始就執行一次* @param endTime 指定結束時間* @param actionCount 指定執行次數* @param intervalTime 指定間隔時間*/ public TimerTaskEvent(long startTime, boolean isStartAction, long endTime, int actionCount, int intervalTime) { super; this.startTime = startTime; this.startAction = isStartAction; this.endTime = endTime; this.actionCount = actionCount; this.intervalTime = intervalTime; } /** * 指定任務的開始執行時間* * @param startTime 指定開始時間* @param isStartAction 是否一開始就執行一次* @param actionCount 指定執行次數* @param intervalTime 指定間隔時間*/ public TimerTaskEvent(long startTime, boolean isStartAction, int actionCount, int intervalTime) { this(startTime, isStartAction, 0, actionCount, intervalTime); } /** * 指定結束時間已結束時間為準,執行次數不一定夠* * @param isStartAction 是否一開始就執行一次* @param endTime 指定結束時間* @param actionCount 指定執行次數* @param intervalTime 指定間隔時間* */ public TimerTaskEvent(boolean isStartAction, long endTime, int actionCount, int intervalTime) { this(0, isStartAction, endTime, actionCount, intervalTime); } /** * 指定開始時間,和結束時間* * @param startTime 指定開始時間* @param endTime 指定結束時間* @param intervalTime 指定間隔時間*/ public TimerTaskEvent(long startTime, long endTime, int intervalTime) { this(startTime, false, endTime, -1, intervalTime); } /** * 指定的執行次數和間隔時間* * @param actionCount 指定執行次數* @param intervalTime 指定間隔時間*/ public TimerTaskEvent(int actionCount, int intervalTime) { this(0, false, 0, actionCount, intervalTime); } /** * 提交後指定的時間無限制執行* * @param intervalTime 指定間隔時間*/ public TimerTaskEvent(int intervalTime) { this(0, false, 0, -1, intervalTime); } public long getStartTime { return startTime; } public void setStartTime(long startTime) { this.startTime = startTime; } public boolean isStartAction { return startAction; } public void setStartAction(boolean startAction) { this.startAction = startAction; } public long getEndTime { return endTime; } public void setEndTime(long endTime) { this.endTime = endTime; } public int getActionCount { return actionCount; } public void setActionCount(int actionCount) { this.actionCount = actionCount; } public int getIntervalTime { return intervalTime; } public void setIntervalTime(int intervalTime) { this.intervalTime = intervalTime; }}
這裡是任務模型和定時器任務模型;
package net.sz.engine.thread;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ConcurrentLinkedQueue;import net.sz.engine.structs.ObjectGlobal;import net.sz.engine.utils.MailUtil;import net.sz.engine.utils.StringUtil;import org.apache.log4j.Logger;import org.jboss.jandex.Main;/** * 線程模型* <br> * author 失足程序員<br> * mail [email protected]<br> * phone 13882122019<br> */public class ThreadModel implements Runnable { private static final Logger log = Logger.getLogger(ThreadModel.class); private static long threadID = 0; protected static final Object SYN_OBJECT = new Object; protected long tid; protected String name; protected long lastSendMail = 0; protected final ArrayList<MyThread> threads = new ArrayList<>; /** * 任務列表線程安全的任務列表*/ //protected final List<TaskModel> taskQueue = new ArrayList<>; protected final ConcurrentLinkedQueue<TaskEvent> taskQueue = new ConcurrentLinkedQueue<>; /** * */ protected final List<TimerTaskEvent> timerQueue = new ArrayList<>; // false標識刪除線程protected volatile boolean runing = true; public ThreadModel(ThreadGroup group) { this(group, "無名", 1); } public ThreadModel(String name) { this(ThreadPool.UnknownThreadGroup, name, 1); } public ThreadModel(ThreadGroup group, String name, int threadCount) { this(group, name, threadCount, null); } public ThreadModel(ThreadGroup group, String name, int threadCount, Runnable runnable) { synchronized (SYN_OBJECT) {threadID++;tid = threadID; } for (int i = 1; i <= threadCount; i++) { MyThread thread;if (runnable == null) {thread = new MyThread(tid, group, this, name + "-" + tid + "-" + i);} else {thread = new MyThread(tid, group, runnable, name + "-" + tid + "-" + i); } thread.start; threads.add(thread); } this.name = name; } /** * 線程名字* * @return */ public String getName { return name; } /** * 獲取線程的自定義id * * @return */ public long getId { return this.tid; } /** * 增加新的任務每增加一個新任務,都要喚醒任務隊列* * @param runnable */ public void addTask(TaskEvent runnable) { taskQueue.add(runnable); synchronized (taskQueue) {/* 喚醒隊列, 開始執行*/ taskQueue.notifyAll; } } /** * 向線程添加定時器任務* * @param runnable */ public void addTimer(TimerTaskEvent runnable) { synchronized (timerQueue) {if (runing) {//一開始執行一次if (runnable.startAction) { addTask(runnable); } timerQueue.add(runnable);} else {log.error("線程已經停止"); } } } // <editor-fold defaultstate="collapsed" desc="定時器線程執行器public void timerRun"> /** * 定時器線程執行器*/ public void timerRun { ArrayList<TimerTaskEvent> taskModels; synchronized (timerQueue) {// 隊列不為空的情況下取出隊列定時器任務taskModels = new ArrayList<>(timerQueue); } if (!taskModels.isEmpty) {for (TimerTaskEvent timerEvent : taskModels) {int execCount = timerEvent.getRunOther.getintValue("Execcount");long lastTime = timerEvent.getRunOther.getlongValue("LastExecTime");long nowTime = System.currentTimeMillis;if (lastTime == 0) {timerEvent.getRunOther.put("LastExecTime", nowTime);} else if (timerEvent.isCancel) {//如果任務已經取消synchronized (timerQueue) { timerQueue.remove(timerEvent); }log.debug("清理定時器任務:" + timerEvent.getClass.getName);} else if (nowTime > timerEvent.getStartTime // 是否滿足開始時間&& (nowTime - timerEvent.getSubmitTime > timerEvent.getIntervalTime)// 提交以後是否滿足了間隔時間&& (timerEvent.getEndTime <= 0 || nowTime < timerEvent.getEndTime) // 判斷結束時間&& (nowTime - lastTime >= timerEvent.getIntervalTime)) // 判斷上次執行到目前是否滿足間隔時間{// 提交執行定時器最先執行this.addTask(timerEvent);// 記錄execCount++;timerEvent.getRunOther.put("Execcount", execCount);timerEvent.getRunOther.put("LastExecTime", nowTime);nowTime = System.currentTimeMillis;// 判斷刪除條件if ((timerEvent.getEndTime > 0 && nowTime < timerEvent.getEndTime)|| (timerEvent.getActionCount > 0 && timerEvent.getActionCount <= execCount)) {synchronized (timerQueue) { timerQueue.remove(timerEvent); }log.debug("清理定時器任務:" + timerEvent.getClass.getName); } } } } } // </editor-fold> // <editor-fold defaultstate="collapsed" desc="查看線程堆棧public void showStackTrace"> /** * * 查看線程堆棧*/ public void showStackTrace { StringBuilder buf = new StringBuilder; for (MyThread currentThread : threads) {long procc = System.currentTimeMillis - currentThread.getLastExecuteTime;if (procc > 5 * 1000 && procc < 864000000L) {//小於10天//因為多線程操作時間可能不准確buf.append("線程[") .append(currentThread.getName).append("]可能已卡死-> ").append(procc / 1000f).append("s/n ").append("執行任務:") .append(currentThread.getLastCommand.getClass.getName);try {StackTraceElement elements = currentThread.getStackTrace;for (int i = 0; i < elements.length; i++) {buf.append("/n ") .append(elements[i].getClassName).append(".") .append(elements[i].getMethodName).append("(").append(elements[i].getFileName).append(";").append(elements[i].getLineNumber).append(")"); }} catch (Exception e) { buf.append(e); }buf.append("/n++++++++++++++++++++++++++++++++++"); } } String toString = buf.toString; if (!StringUtil.isNullOrEmpty(toString)) { log.error(toString);if (System.currentTimeMillis - lastSendMail > 5 * 60 * 1000) {lastSendMail = System.currentTimeMillis;MailUtil.sendMail("線程執行已卡死-> 遊戲id-" + ObjectGlobal.GameID + " 平台-" + ObjectGlobal.Platform + " 服務器id-" + ObjectGlobal.ServerID, toString); } } } // </editor-fold> @Override public void run { MyThread currentThread = (MyThread) Thread.currentThread; while (runing) {while (taskQueue.isEmpty && runing) {try {/* 任務隊列為空,則等待有新任務加入從而被喚醒*/synchronized (taskQueue) {taskQueue.wait(500); }} catch (InterruptedException ie) { log.error(ie); } }/* 取出任務執行*/if (runing) {currentThread.lastCommand = null;currentThread.lastCommand = taskQueue.poll; }if (currentThread.lastCommand != null) {if (currentThread.lastCommand.isCancel) {//如果任務已經取消continue; }/* 執行任務*/// r.setSubmitTimeL;currentThread.lastExecuteTime = System.currentTimeMillis;try { currentThread.lastCommand.run;} catch (Exception e) {log.error("工人<“" + currentThread.getName + "”> 執行任務<" + currentThread.lastCommand.getClass.getName + "> 遇到錯誤: ", e); }long timeL1 = System.currentTimeMillis - currentThread.lastExecuteTime;if (timeL1 <= 20) {} else if (timeL1 <= 100L) {log.info("工人<“" + currentThread.getName + "”> 完成了任務:" + currentThread.lastCommand.toString + " 執行耗時:" + timeL1);} else if (timeL1 <= 200L) {log.info("工人<“" + currentThread.getName + "”> 長時間執行完成任務:" + currentThread.lastCommand.toString + " “考慮”任務腳本邏輯耗時:" + timeL1);} else {log.info("工人<“" + currentThread.getName + "”> 超長時間執行完成任務:" + currentThread.lastCommand.toString + " “考慮是否應該刪除”任務腳本耗時:" + timeL1); }currentThread.lastExecuteTime = 0; } } log.error("線程結束, 工人<“" + Thread.currentThread.getName + "”>退出"); } /** * 自定義線程*/ public class MyThread extends Thread { /** * * @param tid 自定義線程id * @param group 分組* @param run 執行方法* @param name 線程名稱*/ public MyThread(long tid, ThreadGroup group, Runnable run, String name) {super(group, run, name);this._id = tid; } //線程的自定義id public long _id; //正在執行的任務public volatile TaskEvent lastCommand; //開始執行任務的時間public volatile long lastExecuteTime = 0; public TaskEvent getLastCommand {return lastCommand; } public long getLastExecuteTime {return lastExecuteTime; } /** * 返迴線程自定義id * * @return*/ @Override public long getId {return _id; } } /** * 停止線程,設置線程的停止狀態,並不會馬上終止線程*/ public void stop { this.runing = false; } public boolean isRuning { return runing; } @Override public String toString { return "Thread{" + "tid=" + tid + ",Name=" + this.getName + '}'; }}
我從ThreadModel 構造函數的
public ThreadModel(ThreadGroup group, String name, int threadCount, Runnable runnable) { synchronized (SYN_OBJECT) {threadID++;tid = threadID; } for (int i = 1; i <= threadCount; i++) { MyThread thread;if (runnable == null) {thread = new MyThread(tid, group, this, name + "-" + tid + "-" + i);} else {thread = new MyThread(tid, group, runnable, name + "-" + tid + "-" + i); } thread.start; threads.add(thread); } this.name = name; }可以看出,這裡我運行聲明一個或者多個MyThread 線程類
為什麼要這樣考慮呢打個比方,如果是處理日誌的寫入數據這種,沒有共享數據,沒有線程臨界區的處理流程,我可以考慮使用N個線程去處理這樣的工作;不會產生臟數據;
如果是想組隊請求,技能施法這種處理,我需要單隊列處理,那麼threadmodel裡面肯定只有一個MyThread 這樣不算阻塞模式串行執行(或隊列執行)把共享數據和線程臨界區的問題也解決了不再依賴鎖;
字很醜,請見諒
上面圖片看出,在每一個threadmodel 裡面都會兩個隊列,一個timertaskevent,一個是taskevent,會存在一個全局的timer thread;
全局的timer thread 的作用是用來定時去處理和發現threadmodel裡面timertaskevent需要執行了,就把他加入到taskevent隊里里面;最終執行是taskevent隊列
timertaskevent為什麼要存儲在對應的threadmodel裡面呢,那是因為比如,我A線程(threadmodel實例)運行一段時間後需要關閉,釋放資源了,那麼我還要去其他地方查找對應的timertask並移除掉;
package net.sz.engine.thread;import java.util.HashMap;import java.util.Map;/** * * <br> * author 失足程序員<br> * mail [email protected]<br> * phone 13882122019<br> */class TimerThread extends Thread { private static final Object SYN_OBJECT = new Object; public TimerThread { super(ThreadPool.GloblThreadGroup, "Global Timer Thread"); } @Override public void run { while (true) {synchronized (SYN_OBJECT) {try {SYN_OBJECT.wait(2);} catch (InterruptedException ex) { } }HashMap<Long, ThreadModel> hashMap = new HashMap<>(ThreadPool.getThreadMap);for (Map.Entry<Long, ThreadModel> entrySet : hashMap.entrySet) {Long key = entrySet.getKey;ThreadModel value = entrySet.getValue; value.timerRun; } } }}
線程模型的管理器
package net.sz.engine.thread;import java.util.HashMap;import java.util.concurrent.ConcurrentHashMap;import net.sz.engine.script.manager.ScriptManager;import net.sz.engine.timer.GlobTimerEvent;import net.sz.engine.timer.PrintlnServerMemoryTimerEvent;import org.apache.log4j.Logger;/** * 線程管理器* * <br> * author 失足程序員<br> * mail [email protected]<br> * phone 13882122019<br> */public class ThreadPool { static private final Logger log = Logger.getLogger(ThreadPool.class); static public final long GloblThread; static private final TimerThread GloblTimerThread; static final long CheckThreadTimerThreadModel; static public final ThreadGroup GloblThreadGroup = new ThreadGroup("Global ThreadGroup"); static public final ThreadGroup UnknownThreadGroup = new ThreadGroup(GloblThreadGroup, "Unknown ThreadGroup"); static private final ConcurrentHashMap<Long, ThreadModel> threadMap = new ConcurrentHashMap<>; public static void main(String[] args) { ThreadPool.addTimerTask(GloblThread, new TimerTaskEvent(1000) { @Overridepublic void run {log.error("ssssss"); } }); } static { //創建全局線程GloblThread = addThreadModel(GloblThreadGroup, "GloblThread"); //執行指定任務定時觸發腳步addTimerTask(GloblThread, new GlobTimerEvent(ScriptManager.getInstance.getBaseScriptEntry)); //查詢服務器消耗定時模型addTimerTask(GloblThread, new PrintlnServerMemoryTimerEvent); //創建定時器線程GloblTimerThread = new TimerThread; GloblTimerThread.start; //檢查線程卡死情況CheckThreadTimerThreadModel = addThreadModel(GloblThreadGroup, "Check Thread Timer Event"); addTimerTask(CheckThreadTimerThreadModel, new CheckThreadTimerEvent); } /** * 刪除指定id線程模型的時候回設置狀態為停止狀態* * @param tid * @return */ static public ThreadModel remove(long tid) { ThreadModel remove = threadMap.remove(tid); if (remove != null) { remove.stop; } return remove; } /** * 獲取線程池中所有線程* * @return */ static public ConcurrentHashMap<Long, ThreadModel> getThreadMap { return threadMap; } /** * 獲取線程池的一個線程* * @param threadId * @return */ static public ThreadModel getThreadModel(long threadId) { ThreadModel get = threadMap.get(threadId); if (get == null) {log.error("無法找到線程模型:" + threadId, new Exception("無法找到線程模型:" + threadId)); } return get; } /** * 向線程池註冊一個線程* <br> * 默認分組UnknownThreadGroup * * @param name 線程名稱* @return */ static public long addThreadModel(String name) { return addThreadModel(UnknownThreadGroup, name); } /** * 向線程池註冊一個線程* <br> * 默認分組UnknownThreadGroup * * @param name 線程名稱* @param threadcount 線程量* @return */ static public long addThreadModel(String name, int threadcount) { return addThreadModel(UnknownThreadGroup, name, threadcount); } /** * 向線程池註冊一個線程* * @param group 線程分組信息* @param name 線程名稱* @return */ static public long addThreadModel(ThreadGroup group, String name) { return addThreadModel(group, name, 1); } /** * 向線程池註冊一個線程* * @param group 線程分組信息* @param name 線程名稱* @param threadcount 線程量* @return */ static public long addThreadModel(ThreadGroup group, String name, int threadcount) { return addThreadModel(group, name, null, threadcount); } /** * 向線程池註冊一個線程* * @param group 線程分組信息* @param name 線程名稱* @param runnable * @param threadcount 線程量* @return */ static public long addThreadModel(ThreadGroup group, String name, Runnable runnable, int threadcount) { ThreadModel threadModel = new ThreadModel(group, name, threadcount, runnable); return addThreadModel(threadModel); } /** * 向線程池註冊一個線程* * @param threadModel */ static public long addThreadModel(ThreadModel threadModel) { threadMap.put(threadModel.getId, threadModel); return threadModel.getId; } /** * 添加任務* * @param threadId * @param task * @return */ static public boolean addTask(long threadId, TaskEvent task) { ThreadModel threadModel = getThreadModel(threadId); if (threadModel != null) { threadModel.addTask(task);return true; } return false; } /** * 添加定時器任務* * @param threadId * @param task * @return */ static public boolean addTimerTask(long threadId, TimerTaskEvent task) { ThreadModel threadModel = getThreadModel(threadId); if (threadModel != null) { threadModel.addTimer(task);return true; } return false; } /** * 添加任務,添加任務到當前線程* * @param task * @return */ static public boolean addCurrentThreadTask(TaskEvent task) { Thread currentThread = Thread.currentThread; if (currentThread instanceof ThreadModel.MyThread) {long threadId = currentThread.getId;ThreadModel threadModel = getThreadModel(threadId);if (threadModel != null) { threadModel.addTask(task);return true; } } return false; } /** * 添加定時器任務,添加任務到當前線程* * @param task * @return */ static public boolean addCurrentThreadTimerTask(TimerTaskEvent task) { Thread currentThread = Thread.currentThread; if (currentThread instanceof ThreadModel.MyThread) {long threadId = currentThread.getId;ThreadModel threadModel = getThreadModel(threadId);if (threadModel != null) { threadModel.addTimer(task);return true; } } return false; }}
接下來我們看看使用情況
上篇文章中線程介紹代碼
public static void main(String[] args) throws InterruptedException { //線程並行情況,有多個線程執行多個任務/函數new Thread(new Run1).start; new Thread(new Run2).start; } //任務1 static class Run1 implements Runnable { @Override public void run {//執行任務1 run1;//執行任務3 run3; } } //任務2 static class Run2 implements Runnable { @Override public void run {//執行任務3 run3;//執行任務1 run1;//執行任務2 run2; } } //任務1 public static void run1 { System.out.println("run1->" + System.currentTimeMillis); } //任務2 public static void run2 { System.out.println("run2->" + System.currentTimeMillis); } //任務3 public static void run3 { System.out.println("run3->" + System.currentTimeMillis); }我把代碼切換模式
public static void main(String[] args) throws InterruptedException { //線程並行情況,有多個線程執行多個任務/函數long test1 = ThreadPool.addThreadModel("測試線程-1"); long test2 = ThreadPool.addThreadModel("測試線程-2"); //添加任務ThreadPool.addTask(test1, new Run1); ThreadPool.addTask(test2, new Run2); //添加定時器任務ThreadPool.addTimerTask(test1, new TimerRun1); ThreadPool.addTimerTask(test2, new TimerRun2); } //任務1 static class Run1 extends TaskEvent { @Override public void run {//執行任務1 run1;//執行任務3 run3; } } //任務1 static class TimerRun1 extends TimerTaskEvent { public TimerRun1 {super(500);//500毫秒無限制執行} @Override public void run {//執行任務1 run1;//執行任務3 run3; } } //任務2 static class Run2 extends TaskEvent { @Override public void run {//執行任務3 run3;//執行任務1 run1;//執行任務2 run2; } } //任務2 static class TimerRun2 extends TimerTaskEvent { public TimerRun2 {super(500);//500毫秒無限制執行} @Override public void run {//執行任務3 run3;//執行任務1 run1;//執行任務2 run2; } } //任務1 public static void run1 { System.out.println("run1->" + System.currentTimeMillis); } //任務2 public static void run2 { System.out.println("run2->" + System.currentTimeMillis); } //任務3 public static void run3 { System.out.println("run3->" + System.currentTimeMillis); }接下來我們看看執行效果
run1->1472120543013run3->1472120543013run3->1472120543017run1->1472120543017run2->1472120543017run1->1472120543517run3->1472120543517run2->1472120543517run1->1472120544018run3->147 2120544018run2->1472120544018run1->1472120544520run3->1472120544520run2->1472120544520run1->1472120545021run3->1472120545021run2->1472120545021run1->1472120545521run3->1472120545521
一切正常;
這就是我的自定義線程模型;
到這裡我的自定義線程模型就算介紹完成了;
那麼優缺點在哪裡呢?
優點是,數據流程控制很清晰,包括現在執行情況,以及線程卡死監控和任務的定時器執行;
缺點,這個自定義線程模型依然不可能解決線程數據安全和臨界區問題,在適當的時候依然需要靠鎖或者其他形式來解決;
不足之處希望大神們指出,我好即時糾正。