本文實例為大家分享了讀取用戶登入出日誌並上傳服務端的具體實現代碼,供大家參考,具體內容如下
該客戶端運行在給用戶提供unix服務的服務器上。用來讀取並收集該服務器上用戶的上下線信息,並進行配對整理後發送給服務端匯總。
具體實現代碼:
1. DMSServer.java
package com.dms; import java.io.BufferedReader;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.net.ServerSocket;import java.net.Socket;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue; import org.dom4j.Document;import org.dom4j.Element;import org.dom4j.io.SAXReader; /** * DMS服務端,用來接收每個客戶端發送過來的* 配對日誌並保存在本地文件中* @author Administrator * */public class DMSServer { //屬性定義//用來接收客戶端連接的服務端的ServerSocket private ServerSocket server; //用來管理處理客戶端請求的線程的線程池private ExecutorService threadPool; //保存所有客戶端發送過來配對日誌的文件private File serverLogFile; //消息隊列private BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(); public DMSServer() throws Exception{ try { System.out.println("服務端正在初始化..."); //1 解析配置文件server-config.xml Map<String,String> config = loadConfig(); //2 根據配置文件內容初始化屬性init(config); System.out.println("服務端初始化完畢..."); } catch (Exception e) { System.out.println("初始化服務端失敗!"); throw e; } } /** * 構造方法初始化第一步,解析配置文件* @return 返回的Map中保存的是配置文件中的* 每一條內容,其中key:標籤的名字, * value為標籤中間的文本* @throws Exception */ private Map<String,String> loadConfig() throws Exception{ try { SAXReader reader = new SAXReader(); Document doc = reader.read(new File("server-config.xml")); Element root = doc.getRootElement(); Map<String,String> config = new HashMap<String,String>(); /* * 獲取<config>標籤中的所有子標籤* 並將每一個子標籤的名字作為key,中間的* 文本作為value存入Map集合*/ List<Element> list = root.elements(); for(Element e : list){ String key = e.getName(); String value = e.getTextTrim(); config.put(key, value); } return config; } catch (Exception e) { System.out.println("解析配置文件異常!"); e.printStackTrace(); throw e; } } /** * 構造方法初始化第二步,根據配置項初始化屬性* @param config * @throws Exception */ private void init(Map<String,String> config) throws Exception{ /* * 用配置文件中的<logrecfile>初始化屬性:serverLogFile * 用配置文件中的<threadsum>初始化屬性:threadPool,這裡創建固定大小線程池。該值作為線程池線程數量* 用配置文件中的<serverport>初始化屬性:server,這裡這個值為ServerSocket的服務端口*/ this.server = new ServerSocket( Integer.parseInt(config.get("serverport")) ); this.serverLogFile = new File( config.get("logrecfile") ); this.threadPool = Executors.newFixedThreadPool( Integer.parseInt(config.get("threadsum")) ); } /** * 服務端開始工作的方法* @throws Exception */ public void start() throws Exception{ /* * 實現要求: * 首先單獨啟動一個線程,用來運行SaveLogHandler * 這個任務,目的是保存所有配對日誌* 然後開始循環監聽服務端端口,一旦一個客戶端連接了, * 就實例化一個ClientHander,然後將該任務交給線程池* 使其分配線程來處理與該客戶端的交互。 * */ try { System.out.println("服務端開始工作..."); SaveLogHandler slh=new SaveLogHandler(); new Thread(slh).start(); while(true){ Socket socket=server.accept(); threadPool.execute(new ClientHandler(socket)); } } catch (Exception e) { e.printStackTrace(); throw e; } } public static void main(String[] args) { try { DMSServer server = new DMSServer(); server.start(); } catch (Exception e) { System.out.println("啟動服務端失敗!"); } } /** * 該線程負責從消息隊列中取出每一條配對日誌, * 並存入到serverLogFile文件* @author Administrator * */ private class SaveLogHandler implements Runnable{ public void run(){ PrintWriter pw = null; try { pw = new PrintWriter( new FileOutputStream( serverLogFile,true ) ); while(true){ if(messageQueue.size()>0){ pw.println(messageQueue.poll()); }else{ pw.flush(); Thread.sleep(500); } } } catch (Exception e) { e.printStackTrace(); } finally{ if(pw != null){ pw.close(); } } } } /** * 處理一個指定客戶端請求* @author Administrator * */ private class ClientHandler implements Runnable{ private Socket socket; public ClientHandler(Socket socket){ this.socket = socket; } public void run(){ /* * 思路: * 首先接收客戶端發送過來的所有配對日誌, * 直到讀取到"OVER"為止,然後將這些配對* 日誌保存到本地的文件中,並回复客戶端* "OK" * 執行步驟: * 1:通過Socket創建輸出流,用來給客戶端* 發送響應* 2:通過Socket創建輸入流,讀取客戶端發送* 過來的日誌* 3:循環讀取客戶端發送過來的每一行字符串,並* 先判斷是否為字符串"OVER",若不是,則是* 一條配對日誌,那麼保存到本地文件,若是, * 則停止讀取。 * 4:成功讀取所有日誌後回复客戶端"OK" */ PrintWriter pw = null; try { //1 pw = new PrintWriter( new OutputStreamWriter( socket.getOutputStream(),"UTF-8" ) ); //2 BufferedReader br = new BufferedReader( new InputStreamReader( socket.getInputStream(),"UTF-8" ) ); //3 String message = null; while((message = br.readLine())!=null){ if("OVER".equals(message)){ break; } //將該日誌寫入文件保存messageQueue.offer(message); } //4 pw.println("OK"); pw.flush(); } catch (Exception e) { e.printStackTrace(); pw.println("ERROR"); pw.flush(); } finally{ try { //與客戶端斷開連接釋放資源socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }} 2. DMSClient.java
package com.dms; import java.io.BufferedReader;import java.io.File;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.io.RandomAccessFile;import java.net.Socket;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.Set; import org.dom4j.Document;import org.dom4j.Element;import org.dom4j.io.SAXReader; import com.dms.bo.LogData;import com.dms.bo.LogRec; /** * 該客戶端運行在給用戶提供unix服務的服務器上。 * 用來讀取並收集該服務器上用戶的上下線信息,並* 進行配對整理後發送給服務端匯總。 * @author Administrator * */public class DMSClient { //屬性定義//第一步:解析日誌所需屬性//unix系統日誌文件private File logFile; //保存解析後日誌的文件private File textLogFile; //書籤文件private File lastPositionFile; //每次解析日誌的條目數private int batch; //第二步:配對日誌所需要屬性//保存配對日誌的文件private File logRecFile; //保存未配對日誌的文件private File loginLogFile; //第三步:發送日誌所需要屬性//服務端地址private String serverHost; //服務端端口private int serverPort; /** * 構造方法,用來初始化客戶端* @throws Exception */ public DMSClient() throws Exception{ try { //1 解析配置文件config.xml Map<String,String> config = loadConfig(); //打樁System.out.println(config); //2 根據配置文件內容初始化屬性init(config); } catch (Exception e) { System.out.println("初始化失敗!"); throw e; } } /** * 構造方法初始化第二步,根據配置項初始化屬性* @param config * @throws Exception */ private void init(Map<String,String> config) throws Exception{ try { logFile = new File( config.get("logfile") ); textLogFile = new File( config.get("textlogfile") ); lastPositionFile = new File( config.get("lastpositionfile") ); batch = Integer.parseInt( config.get("batch") ); logRecFile = new File( config.get("logrecfile") ); loginLogFile = new File( config.get("loginlogfile") ); serverHost = config.get("serverhost"); serverPort = Integer.parseInt( config.get("serverport") ); } catch (Exception e) { System.out.println("初始化屬性失敗!"); e.printStackTrace(); throw e; } } /** * 構造方法初始化第一步,解析配置文件* @return 返回的Map中保存的是配置文件中的* 每一條內容,其中key:標籤的名字, * value為標籤中間的文本* @throws Exception */ private Map<String,String> loadConfig() throws Exception{ try { SAXReader reader = new SAXReader(); Document doc = reader.read(new File("config.xml")); Element root = doc.getRootElement(); Map<String,String> config = new HashMap<String,String>(); /* * 獲取<config>標籤中的所有子標籤* 並將每一個子標籤的名字作為key,中間的* 文本作為value存入Map集合*/ List<Element> list = root.elements(); for(Element e : list){ String key = e.getName(); String value = e.getTextTrim(); config.put(key, value); } return config; } catch (Exception e) { System.out.println("解析配置文件異常!"); e.printStackTrace(); throw e; } } /** * 客戶端開始工作的方法* 循環執行三步: * 1:解析日誌* 2:配對日誌* 3:發送日誌*/ public void start(){ parseLogs(); matchLogs(); sendLogs();// while(true){// //解析日誌// if(!parseLogs()){// continue;// }// //配對日誌// if(!matchLogs()){// continue;// }// //發送日誌// sendLogs();// } } /** * 第三步:發送日誌* @return true:發送成功* false:發送失敗*/ private boolean sendLogs(){ /* * 實現思路: * 將logRecFile文件中的所有配對日誌讀取* 出來然後連接上服務端並發送過去,若服務端* 全部接收,就可以將該文件刪除,表示發送* 完畢了。 * 實現步驟: * 1:logRecFile文件必須存在* 2:將所有配對日誌讀取出來並存入一個集合* 等待發送* 3:通過Socket連接服務端* 4:創建輸出流* 5:順序將所有配對日誌按行發送給服務端* 6:單獨發送一個字符串"OVER"表示所有日誌* 均已發送完畢* 7:創建輸入流* 8:讀取服務端發送回來的響應字符串* 9:若響應的字符串為"OK",表示服務端正常* 接收了所有日誌,這時就可以將logRecFile * 文件刪除並返回true表示發送完畢。 * */ Socket socket = null; try { //1 if(!logRecFile.exists()){ System.out.println(logRecFile+"不存在!"); return false; } //2 List<String> matches = IOUtil.loadLogRec(logRecFile); //3 socket = new Socket(serverHost,serverPort); //4 PrintWriter pw = new PrintWriter( new OutputStreamWriter( socket.getOutputStream(),"UTF-8" ) ); //5 for(String log : matches){ pw.println(log); } //6 pw.println("OVER"); pw.flush(); //7 BufferedReader br = new BufferedReader( new InputStreamReader( socket.getInputStream(),"UTF-8" ) ); //8 String response = br.readLine(); //9 if("OK".equals(response)){ logRecFile.delete(); return true; }else{ System.out.println("發送日誌失敗!"); return false; } } catch (Exception e) { System.out.println("發送日誌失敗!"); e.printStackTrace(); } finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } return false; } /** * 第二步:配對日誌* @return true:配對成功* false:配對失敗*/ private boolean matchLogs(){ /* * 實現思路: * 將第一步解析的新日誌,與上次為配對成功* 的登入日誌全部讀取出來,然後再按照user, * pid相同,type一個是7,一個是8進行配對。 * 只要能找到類型為8的,一定可以找到一個* 能與之配對的登入日誌。 * * 實現步驟: * 1:必要的判斷* 1.1:logRecFile是否存在,存在則不再* 進行新的配對工作,避免覆蓋。 * 1.2:textLogFile文件必須存在。 * 2:讀取textLogFile將日誌讀取出來,並* 存入到集合中。 (若干LogData實例) * 3:若loginLogFile文件若存在,則說明* 有上次未配對成功的日誌,也將其讀取* 出來存入集合等待一起配對* 4:配對工作* 4.1:創建一個集合,用於保存所有配對日誌* 4.2:創建兩個Map分別保存登入日誌與登出日誌* 4.3:遍歷所有待配對的日誌,按照登入與登出* 分別存入兩個Map中, * 其中key:user,pid * value:LogData實例* 4.4:遍歷登出Map,並根據每條登出日誌的key * 去登入Map中找到對應的登入日誌,並* 以一個LogRec實例保存該配對日誌,然後* 存入配對日誌的集合中。並將該配對日誌* 中的登入日誌從登入Map中刪除。這樣一來* 登入Map中應當只剩下沒有配對的了。 * 5:將配對日誌寫入到logRecFile中* 6:將所有未配對日誌寫入到loginLogFile中* 7:將textLogFile文件刪除* 8:返回true,表示配對完畢* */ try { //1 //1.1 if(logRecFile.exists()){ return true; } //1.2 if(!textLogFile.exists()){ System.out.println(textLogFile+"不存在!"); return false; } //2 List<LogData> list = IOUtil.loadLogData(textLogFile); //3 if(loginLogFile.exists()){ list.addAll( IOUtil.loadLogData(loginLogFile) ); } //4 //4.1 List<LogRec> matches = new ArrayList<LogRec>(); //4.2 Map<String,LogData> loginMap = new HashMap<String,LogData>(); Map<String,LogData> logoutMap = new HashMap<String,LogData>(); //4.3 for(LogData logData : list){ String key = logData.getUser()+","+ logData.getPid(); if(logData.getType()==LogData.TYPE_LOGIN){ loginMap.put(key, logData); }else if(logData.getType()==LogData.TYPE_LOGOUT){ logoutMap.put(key, logData); } } //4.4 Set<Entry<String,LogData>> entrySet = logoutMap.entrySet(); for(Entry<String,LogData> e : entrySet){ LogData logout = e.getValue(); LogData login = loginMap.remove(e.getKey()); LogRec logRec = new LogRec(login,logout); matches.add(logRec); } //5 IOUtil.saveCollection(matches, logRecFile); //6 IOUtil.saveCollection( loginMap.values(),loginLogFile ); //7 textLogFile.delete(); //8 return true; } catch (Exception e) { System.out.println("配對日誌失敗!"); e.printStackTrace(); } return false; } /** * 第一步:解析日誌* @return true:解析成功* false:解析失敗*/ private boolean parseLogs(){ /* * 實現思路: * 循環讀取batch條日誌,然後將每條日誌中的* 5個信息解析出來,最終組成一個字符串,以* 行為單位,寫入到textLogFile文件中* * 實現步驟: * 1:必要的判斷工作* 1.1:為了避免解析的日誌還沒有被使用,而* 第一步又重複執行導致之前日誌被覆蓋* 的問題,這裡需要判斷,若保存解析後* 的日誌文件存在,則第一步不再執行。 * 該日誌文件會在第二步配對完畢後刪除。 * 1.2:logFile文件必須存在(wtmpx文件) * 1.3:是否還有日誌可以解析* 2:創建RandomAccessFile來讀取logFile * 3:將指針移動到上次最後讀取的位置,準備* 開始新的解析工作* 4:解析工作* 4.1:創建一個List集合,用於保存解析後* 的每一條日誌(LogData實例) * 4.2:循環batch次,解析每條日誌中的* 5項內容(user,pid,type,time,host) * 並用一個LogData實例保存,然後將* 該LogData實例存入集合* 5:將集合中的所有的日誌以行為單位保存到* textLogFile中* 6:保存書籤信息* 7:返回true,表示工作完畢* */ RandomAccessFile raf = null; try { //1 //1.1 if(textLogFile.exists()){ return true; } //1.2 if(!logFile.exists()){ System.out.println(logFile+"不存在!"); return false; } //1.3 long lastPosition = hasLogs(); //打樁// System.out.println(// "lastPosition:"+lastPosition// ); if(lastPosition<0){ System.out.println("沒有日誌可以解析了!"); return false; } //2 raf = new RandomAccessFile(logFile,"r"); //3 raf.seek(lastPosition); //4 List<LogData> list = new ArrayList<LogData>(); for(int i=0;i<batch;i++){ //每次解析前都判斷是否還有日誌可以解析if(logFile.length()-lastPosition <LogData.LOG_LENGTH ){ break; } //解析user raf.seek(lastPosition+LogData.USER_OFFSET); String user = IOUtil.readString( raf, LogData.USER_LENGTH ).trim(); //解析PID raf.seek(lastPosition+LogData.PID_OFFSET); int pid = raf.readInt(); //解析TYPE raf.seek(lastPosition+LogData.TYPE_OFFSET); short type = raf.readShort(); //解析TIME raf.seek(lastPosition+LogData.TIME_OFFSET); int time = raf.readInt(); //解析HOST raf.seek(lastPosition+LogData.HOST_OFFSET); String host = IOUtil.readString( raf, LogData.HOST_LENGTH ).trim(); LogData log = new LogData(user, pid, type, time, host); list.add(log); //打樁// System.out.println(log); //當解析完一條日誌後,更新lastPosition lastPosition = raf.getFilePointer(); } //5 IOUtil.saveCollection(list, textLogFile); //6 保存書籤文件IOUtil.saveLong( lastPosition, lastPositionFile); //7 return true; } catch (Exception e) { System.out.println("解析日誌失敗!"); e.printStackTrace(); } finally{ if(raf != null){ try { raf.close(); } catch (IOException e) { e.printStackTrace(); } } } return false; } /** * 第一步解析日誌中的一個環節, * 根據書籤文件記錄的位置判斷是否還有* 日誌可以解析,若有,則將上次最後的位置* 返回,若沒有則返回-1。 * @return */ private long hasLogs(){ try { /* * 若lastPositionFile不存在,則說明* 從來沒有解析過,那麼從頭開始解析即可*/ if(!lastPositionFile.exists()){ return 0; } long lastPosition = IOUtil.readLong(lastPositionFile); if(logFile.length()-lastPosition >=LogData.LOG_LENGTH){ return lastPosition; } } catch (Exception e) { e.printStackTrace(); } return -1; } public static void main(String[] args) { try { DMSClient client = new DMSClient(); client.start(); } catch (Exception e) { System.out.println("客戶端運行失敗!"); } }} 3. IOUtil.java
package com.dms; import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.InputStreamReader;import java.io.PrintWriter;import java.io.RandomAccessFile;import java.util.ArrayList;import java.util.Collection;import java.util.List; import com.dms.bo.LogData; /** * 該類是一個工具類,負責客戶端的IO操作* @author Administrator * */public class IOUtil { /** * 從給定的文件中讀取每一行字符串(配對日誌) * 並存入一個集合後返回* @param file * @return * @throws Exception */ public static List<String> loadLogRec(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); List<String> list = new ArrayList<String>(); String line = null; while((line = br.readLine())!=null){ list.add(line); } return list; } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br != null){ br.close(); } } } /** * 從給定的文件中讀取每一條配對日誌,並存入* 一個集合中然後返回。 * @param file * @return * @throws Exception */ public static List<LogData> loadLogData(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); List<LogData> list = new ArrayList<LogData>(); String line = null; while((line = br.readLine())!=null){ LogData logData = new LogData(line); list.add(logData); } return list; } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br!=null){ br.close(); } } } /** * 將指定的long值以字符串的形式寫入到* 給定文件的第一行* @param l * @param file * @throws Exception */ public static void saveLong( long lon,File file) throws Exception{ PrintWriter pw = null; try { pw = new PrintWriter(file); pw.println(lon); } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(pw != null){ pw.close(); } } } /** * 將集合中每個元素的toString方法返回的字符串* 以行為單位寫入到指定文件中。 * @param c * @param file * @throws Exception */ public static void saveCollection( Collection c,File file) throws Exception{ PrintWriter pw = null; try { pw = new PrintWriter(file); for(Object o : c){ pw.println(o); } } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(pw != null){ pw.close(); } } } /** * 從給定的RandomAccessFile當前位置開始連續* 讀取length個字節,並轉換為字符串後返回* @param raf * @param length * @return * @throws Exception */ public static String readString( RandomAccessFile raf,int length) throws Exception{ try { byte[] data = new byte[length]; raf.read(data); return new String(data,"ISO8859-1"); } catch (Exception e) { e.printStackTrace(); throw e; } } /** * 從給定文件中讀取第一行字符串,然後將其* 轉換為一個long值後返回* @param file * @return * @throws Exception */ public static long readLong(File file) throws Exception{ BufferedReader br = null; try { br = new BufferedReader( new InputStreamReader( new FileInputStream( file ) ) ); String line = br.readLine(); return Long.parseLong(line); } catch (Exception e) { e.printStackTrace(); throw e; } finally{ if(br != null){ br.close(); } } }}
4. config.xml
<?xml version="1.0" encoding="UTF-8"?><config> <!-- unix系統日誌文件名--> <logfile>wtmpx</logfile> <!-- 保存解析後日誌的文件名--> <textlogfile>log.txt</textlogfile> <!-- 書籤文件名--> <lastpositionfile>last-position.txt</lastpositionfile> <!-- 每次解析日誌的條目數--> <batch>10</batch> <!-- 配對日誌文件名--> <logrecfile>logrec.txt</logrecfile> <!-- 未配對日誌文件名--> <loginlogfile>login.txt</loginlogfile> <!-- 服務端地址--> <serverhost>localhost</serverhost> <!-- 服務端端口--> <serverport>8088</serverport></config>
5. server-config.xml
<?xml version="1.0" encoding="UTF-8"?><config> <!-- 服務端保存配對日誌文件的文件名--> <logrecfile>server-logs.txt</logrecfile> <!-- 線程池線程數量--> <threadsum>30</threadsum> <!-- 服務端端口--> <serverport>8088</serverport></config>
以上就是本文的全部內容,希望對大家的學習有所幫助。