Written before:
Yesterday I recorded the initial design of a Socket chat program I took the time to write on my blog. It was the overall design of this program. For completeness, today I will record the design of the server side in detail. The homepage will post the general design diagram of the Socket chat program, as shown below:
Function description:
The server has two main operations: one is to block the socket of the receiving client and perform response processing, and the other is to detect the heartbeat of the client. If the client does not send a heartbeat for a period of time, remove the client, create the ServerSocket, and then start two thread pools to handle these two things (newFixedThreadPool, newScheduledThreadPool). The corresponding processing classes are SocketDispatcher and SocketSchedule. The SocketDispatcher is distributed to different SocketHandlers according to different socket requests. SocketWrapper adds a shell wrapper to the socket, and records the latest interaction time of the socket with the SocketHolder stores the socket collection that currently interacts with the server.
Specific implementation:
[Server.java]
Server is the entrance to the server. ServerSocket is started by the start() method of Server, and then blocks the receiving client's request and handed over to the SocketDispatcher for distribution. The SocketDispatcher is started by a thread pool of newFixedThread type. When the number of connections exceeds the maximum data, it will be processed by the queue. ScheduleAtFixedRate is used to start the SocketSchedule timing loop to listen to the client's heartbeat package. Both types implement the Runnable interface. The following is the code for the server:
package yaolin.chat.server;import java.io.IOException;import java.net.ServerSocket;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import yaolin.chat.common.ConstantValue;import yaolin.chat.util.LoggerUtil;/** * Server* @author yaolin */public class Server { private final ServerSocket server; private final ExecutorService pool; public Server() throws IOException { server = new ServerSocket(ConstantValue.SERVER_PORT); pool = Executors.newFixedThreadPool(ConstantValue.MAX_POOL_SIZE); } public void start() { try { ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1); // Watch dog. Exception?? schedule.scheduleAtFixedRate(new SocketSchedule(), 10, ConstantValue.TIME_OUT, TimeUnit.SECONDS); while (true) { pool.execute(new SocketDispatcher(server.accept())); LoggerUtil.info("ACCEPT A CLIENT AT " + new Date()); } } catch (IOException e) { pool.shutdown(); } } public static void main(String[] args) { try { new Server().start(); } catch (IOException e) { LoggerUtil.error("Server start failed! -> " + e.getMessage(), e); } }}[SocketDispatcher.java]
Server is just the entrance to the server and the command center. SocketDispatcher is the command center of the server. It distributes different message types requests from the client, allowing different SocketHandlers to process the corresponding message requests. Here, the message interaction between the server and the client uses JSON data. All message classes inherit BaseMessage, so the received data is converted into BaseMessage type, and then the type is judged. (The data type module belongs to the common module). It should be mentioned here that when the message type is a file type, it will sleep to configure the execution interval, so that FileHandler can have time to read and resend the file stream to the specified client, without immediately entering the next loop to judge the message type (the design may be a bit problematic here, but do this for the time being). The following is the code of SocketDispatcher:
/** * SocketDispatcher * * @author yaolin */public class SocketDispatcher implements Runnable { private final Socket socket; public SocketDispatcher(Socket socket) { this.socket = socket; } @Override public void run() { if (socket != null) { while (!socket.isClosed()) { try { InputStream is = socket.getInputStream(); String line = null; StringBuffer sb = null; if (is.available() > 0) { BufferedReader bufr = new BufferedReader(new InputStreamReader(is)); sb = new StringBuffer(); while (is.available() > 0 && (line = bufr.readLine()) != null) { sb.append(line); } LoggerUtil.trach("RECEIVE [" + sb.toString() + "] AT " + new Date()); BaseMessage message = JSON.parseObject(sb.toString(), BaseMessage.class); switch (message.getType()) { case MessageType.ALIVE: HandlerFactory.getHandler(MessageType.ALIVE).handle(socket, sb.toString()); break; case MessageType.CHAT: HandlerFactory.getHandler(MessageType.CHAT).handle(socket, sb.toString()); break; case MessageType.FILE: HandlerFactory.getHandler(MessageType.FILE).handle(socket, sb.toString()); break; case MessageType.FILE: HandlerFactory.getHandler(MessageType.FILE).handle(socket, sb.toString()); LoggerUtil.trach("SEVER:PAUSE TO RECEIVE FILE"); Thread.sleep(ConstantValue.MESSAGE_PERIOD); break; case MessageType.LOGIN: HandlerFactory.getHandler(MessageType.LOGIN).handle(socket, sb.toString()); break; case MessageType.LOGOUT: break; case MessageType.REGISTER: HandlerFactory.getHandler(MessageType.REGISTER).handle(socket, sb.toString()); break; } } else { Thread.sleep(ConstantValue.MESSAGE_PERIOD); } } catch (Exception e) { // catch all handler exception LoggerUtil.error("SocketDispatcher Error!" + e.getMessage(), e); } } } } } }[SocketSchedule.java]
Another class (component) that is directly related to Server is SocketSchedule. SocketSchedule is mainly responsible for detecting whether the latest interaction time between the client and the server exceeds the maximum allowed time in the system configuration. If it exceeds, the client socket will be removed from the server, otherwise the latest interaction time between the client and the server will be updated. The following are the specific implementations:
/** * Remove socket from SocketHolder if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketSchedule implements Runnable { @Override public void run() { for (String key : SocketHolder.keySet()) { SocketWrapper wrapper = SocketHolder.get(key); if (wrapper != null && wrapper.getLastAliveTime() != null) { if (((new Date().getTime()) - wrapper.getLastAliveTime().getTime()) / 1000) > ConstantValue.TIME_OUT) { // remove socket if timeout SocketHolder.remove(key); } } } } } }[SocketHolder.java, SocketWrapper.java]
From the above code, we can see that SocketSchedule#run() is just a simple judgment of time. What is really meaningful is SocketHolder and SocketWrapper. SocketWrapper adds a shell wrapper to the socket. SocketHolder stores all clients that interact with the server during the current valid time. SocketHolder is uniquely identified by the client (user name here). As KEY, the socket where the client is located is stored as a key-value pair of VALUE. The processing logic of SocketHolder#flushClientStatus() is used to notify other clients of the online/offline status of the current client. The specific implementation of these two classes is given below:
/** * Wrap Socket, SocketSchedule remove socket if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketWrapper { private Socket socket; private Date lastAliveTime; // full constructor public SocketWrapper(Socket socket, Date lastAliveTime) { this.socket = socket; this.lastAliveTime = lastAliveTime; } public Socket getSocket() { return socket; } public void setSocket(Socket socket) { this.socket = socket; } public Date getLastAliveTime() { return lastAliveTime; } public void setLastAliveTime(Date lastAliveTime) { this.lastAliveTime = lastAliveTime; }} /** * SocketHolder * @author yaolin */public class SocketHolder { private static ConcurrentMap<String, SocketWrapper> listSocketWrap = new ConcurrentHashMap<String, SocketWrapper>(); public static Set<String> keySet() { return listSocketWrap.keySet(); } public static SocketWrapper get(String key) { return listSocketWrap.get(key); } public static void put(String key, SocketWrapper value) { listSocketWrap.put(key, value); flushClientStatus(key, true); } public static SocketWrapper remove(String key) { flushClientStatus(key, false); return listSocketWrap.remove(key); } public static void clear() { listSocketWrap.clear(); } /** * <pre>content:{username:"",flag:false}</pre> * @param flag true:put,false:remove; */ private static void flushClientStatus(String key, boolean flag) { ClientNotifyDTO dto = new ClientNotifyDTO(flag, key); ReturnMessage rm = new ReturnMessage().setKey(Key.NOTIFY).setSuccess(true).setContent(dto); rm.setFrom(ConstantValue.SERVER_NAME); for (String toKey : listSocketWrap.keySet()) { if (!toKey.equals(key)) { // not send to self rm.setTo(toKey); SocketWrapper wrap = listSocketWrap.get(toKey); if (wrap != null) { SendHelper.send(wrap.getSocket(), rm); } } } } } }[SocketHandler.java, HandlerFactory.java, OtherHandlerImpl.java]
SocketDispatcher allows different SocketHandlers to handle corresponding message requests. The design of SocketHandler is actually a simple set of factory components (the ReturnHandler is temporarily transmitted by SendHelper, but it is not used for the time being. It has been @Deprecated, and it is still given here). The complete class diagram is as follows:
The code for this section is given below. In order to reduce the space, all the code implemented by Handler is collected.
/** * SocketHandler * @author yaolin */public interface SocketHandler { /** * Handle Client Socket */ public Object handle(Socket client,Object data);} /** * SocketHandlerFactory * @author yaolin */public class HandlerFactory { // can not create instance private HandlerFactory(){} public static SocketHandler getHandler(int type) { switch (type) { case MessageType.ALIVE: // usually use return new AliveHandler(); case MessageType.CHAT: return new ChatHandler(); case MessageType.LOGIN: return new LoginHandler();// case MessageType.RETURN:// return new ReturnHandler(); case MessageType.LOGOUT: return new LogoutHandler(); case MessageType.REGISTER: return new RegisterHandler(); case MessageType.FILE: return new FileHandler(); } return null; // NullPointException }} /** * AliveSocketHandler * @author yaolin */public class AliveHandler implements SocketHandler { /** * @return null */ @Override public Object handle(Socket client, Object data) { if (data != null) { BaseMessage message = JSON.parseObject(data.toString(), BaseMessage.class); if (StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); if (wrapper != null) { wrapper.setLastAliveTime(new Date()); // KEEP SOCKET ... SocketHolder.put(message.getFrom(), wrapper); } } } return null; }} /** * ChatHandler * * @author yaolin */public class ChatHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (data != null) { ChatMessage message = JSON.parseObject(data.toString(), ChatMessage.class); if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if (SocketHolder.keySet().contains(message.getFrom())) { String owner = message.getFrom(); message.setOwner(owner); // owner will be display if (ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all // TO_ALL TAB will be select; message.setFrom(ConstantValue.TO_ALL); for (String key : SocketHolder.keySet()) { // also send to self SocketWrapper wrapper = SocketHolder.get(key); if (wrapper != null) { SendHelper.send(wrapper.getSocket(), message); } } } else {// one-to-one SocketWrapper wrapper = SocketHolder.get(message.getTo()); if (wrapper != null) { // owner = from SendHelper.send(wrapper.getSocket(), message); // also send to self // TO TAB will be select; message.setFrom(message.getTo()).setTo(owner); SendHelper.send(client, message); } } } } } return null; }} public class FileHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (client != null) { FileMessage message = JSON.parseObject(data.toString(), FileMessage.class); if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { // exist & send if (SocketHolder.keySet().contains(message.getFrom())) { if (!ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all SocketWrapper wrapper = SocketHolder.get(message.getTo()); if (wrapper != null) { SendHelper.send(wrapper.getSocket(), message); try { if (client != null && wrapper.getSocket() != null && message.getSize() > 0) { InputStream is = client.getInputStream(); OutputStream os = wrapper.getSocket().getOutputStream(); int total = 0; while (!client.isClosed() && !wrapper.getSocket().isClosed()) { if (is.available() > 0) { byte[] buff = new byte[ConstantValue.BUFF_SIZE]; int len = -1; while (is.available() > 0 && (len = is.read(buff)) != -1) { os.write(buff, 0, len); total += len; LoggerUtil.debug("SEND BUFF [" + len + "]"); } os.flush(); if (total >= message.getSize()) { LoggerUtil.info("SEND BUFF [OK]"); break; } } } // AFTER SEND FILE // SEND SUCCESSFULLY ReturnMessage result = new ReturnMessage().setKey(Key.TIP) .setSuccess(true) .setContent(I18N.INFO_FILE_SEND_SUCCESSFULLY); result.setFrom(message.getTo()).setTo(message.getFrom()) .setOwner(ConstantValue.SERVER_NAME); SendHelper.send(client, result); // RECEIVE SUCCESSFULLY result.setContent(I18N.INFO_FILE_RECEIVE_SUCCESSFULLY) .setFrom(message.getFrom()) .setTo(message.getTo()); SendHelper.send(wrapper.getSocket(), result); } } catch (Exception e) { LoggerUtil.error("Handle file failed !" + e.getMessage(), e); } } } } } } } return null; }} /** * LoginHandler * * @author yaolin * */public class LoginHandler implements SocketHandler { private UsrService usrService = new UsrService(); @Override public Object handle(Socket client, Object data) { ReturnMessage result = new ReturnMessage(); result.setSuccess(false); if (data != null) { LoginMessage message = JSON.parseObject(data.toString(), LoginMessage.class); if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) { if (usrService.login(message.getUsername(), message.getPassword()) != null) { result.setSuccess(true); } else { result.setMessage(I18N.INFO_LOGIN_ERROR_DATA); } result.setFrom(ConstantValue.SERVER_NAME).setTo(message.getUsername()); } else { result.setMessage(I18N.INFO_LOGIN_EMPTY_DATA); } // AFTER LOGIN result.setKey(Key.LOGIN); if (result.isSuccess()) { // HOLD SOCKET SocketHolder.put(result.getTo(), new SocketWrapper(client, new Date())); } SendHelper.send(client, result); if (result.isSuccess()) { // SEND LIST USER ClientListUserDTO dto = new ClientListUserDTO(); dto.setListUser(SocketHolder.keySet()); result.setContent(dto).setKey(Key.LISTUSER); SendHelper.send(client, result); } } return null; }} public class LogoutHandler implements SocketHandler { @Override public Object handle(Socket client, Object data) { if (data != null) { LogoutMessage message = JSON.parseObject(data.toString(), LogoutMessage.class); if (message != null && StringUtil.isNotEmpty(message.getFrom())) { SocketWrapper wrapper = SocketHolder.get(message.getFrom()); Socket socket = wrapper.getSocket(); if (socket != null) { try { socket.close(); socket = null; } catch (Exception ignore) { } } SocketHolder.remove(message.getFrom()); } } return null; }} public class RegisterHandler implements SocketHandler { private UsrService usrService = new UsrService(); @Override public Object handle(Socket client, Object data) { ReturnMessage result = new ReturnMessage(); result.setSuccess(false).setFrom(ConstantValue.SERVER_NAME); if (data != null) { RegisterMessage message = JSON.parseObject(data.toString(), RegisterMessage.class); if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) { if (usrService.register(message.getUsername(), message.getPassword()) != null) { result.setSuccess(true).setContent(I18N.INFO_REGISTER_OK); } else { result.setMessage(I18N.INFO_REGISTER_CLIENT_EXIST); } } else { result.setMessage(I18N.INFO_REGISTER_EMPTY_DATA); } if (StringUtil.isNotEmpty(message.getUsername())) { result.setTo(message.getUsername()); } // AFTER REGISTER result.setKey(Key.REGISTER); SendHelper.send(client, result); } return null; }} /** * Use SendHelper to send ReturnMessage, * @see yaolin.chat.server.SocketDispatcher#run() * @author yaolin */@Deprecated public class ReturnHandler implements SocketHandler { /** * @param data ReturnMessage */ @Override public Object handle(Socket client, Object data) { if (data != null) { ReturnMessage message = (ReturnMessage) data; if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) { SocketWrapper wrap = SocketHolder.get(message.getTo()); if (wrap != null) { SendHelper.send(wrap.getSocket(), message); } } } return null; }}User business:
In addition to sockets, the server also has a little specific business, that is, user registration, login, etc. Here we simply list the two classes of Usr and UsrService. These businesses have not been implemented for the time being. I do not intend to introduce an ORM framework in this program, so I wrote a set of DBUtil (to be improved) and posted it here.
Only a simple verification is performed here, and it is not persisted to store it in the DB. Here are Usr and UsrService:
public class Usr { private long id; private String username; private String password; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }} /** * // TODO * @see yaolin.chat.server.usr.repository.UsrRepository * @author yaolin * */public class UsrService { // TODO db private static Map<String,Usr> db = new HashMap<String,Usr>(); public Usr register(String username, String password) { if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { return null; } if (db.containsKey(username)) { return null; // exist; } Usr usr = new Usr(); usr.setUsername(username); usr.setPassword(MD5Util.getMD5Code(password)); db.put(username, usr); return usr; } public Usr login(String username, String password) { if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) { return null; } if (db.containsKey(username)) { Usr usr = db.get(username); if (MD5Util.getMD5Code(password).equals(usr.getPassword())) { return usr; } } return null; }} Here is the DBUtil tool:
/** * DBUtils // TODO needs to be adjusted & optimized!! * @author yaolin */public class DBUtil { // make connection used repeatedly private static final List<Connection> cache = new LinkedList<Connection>(); private static String url; private static String driver; private static String user; private static String password; private static Boolean debug; static { InputStream is = DBUtil.class.getResourceAsStream("/db.properties"); try { Properties p = new Properties(); p.load(is); url = p.getProperty("url"); driver = p.getProperty("driver"); user = p.getProperty("user"); password = p.getProperty("password"); // just for debug try { debug = Boolean.valueOf(p.getProperty("debug")); } catch (Exception ignore) { debug = false; } } catch (Exception e) { throw new RuntimeException(e); } finally { if (is != null) { try { is.close(); is = null; } catch (Exception ignore) { } } } } public synchronized static Connection getConnection() { if (cache.isEmpty()) { cache.add(makeConnection()); } Connection conn = null; int i = 0; try { do { conn = cache.remove(i); } while (conn != null && conn.isClosed() && i < cache.size()); } catch (Exception ignore) { } try { if (conn == null || conn.isClosed()) { cache.add(makeConnection()); conn = cache.remove(0); } return conn; } catch (Exception e) { throw new RuntimeException(e); } } public synchronized static void close(Connection connection) { try { if (connection != null && !connection.isClosed()) { if (debug) debug("release connection!"); cache.add(connection); } } catch (SQLException ignore) { } } public static Object query(String sql, ResultSetMapper mapper, Object... args) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; ResultSet rs = null; Object result = null; try { ps = conn.prepareStatement(sql); int i = 1; for (Object object : args) { ps.setObject(i++, object); } rs = ps.executeQuery(); result = mapper.mapper(rs); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (rs != null) { rs.close(); rs = null; } if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return result; } public static int modify(String sql, Object... args) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; int row = 0; try { ps = conn.prepareStatement(sql); int i = 1; for (Object object: args) { ps.setObject(i++, object); } row = ps.executeUpdate(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return row; } public static int[] batch(List<String> sqls) { if (debug) debug(sqls.toString()); Connection conn = getConnection(); Statement stmt = null; int[] row; try { stmt = conn.createStatement(); for (String sql : sqls) { stmt.addBatch(sql); } row = stmt.executeBatch(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (stmt != null) { stmt.close(); stmt = null; } } catch (Exception ignore) { } } close(conn); return row; } public static int[] batch(String sql, PreparedStatementSetter setter) { if (debug) debug(sql); Connection conn = getConnection(); PreparedStatement ps = null; int[] row; try { ps = conn.prepareStatement(sql); setter.setter(ps); row = ps.executeBatch(); } catch (Exception e) { throw new RuntimeException(e); } finally { try { if (ps != null) { ps.close(); ps = null; } } catch (Exception ignore) { } } close(conn); return row; } private static Connection makeConnection() { try { Class.forName(driver).newInstance(); Connection conn = DriverManager.getConnection(url, user, password); if (debug) debug("create connection!"); return conn; } catch (Exception e) { throw new RuntimeException(e); } } private static void debug(String sqls) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(sdf.format(new Date()) + " DEBUG " + Thread.currentThread().getId() + " --- [" + Thread.currentThread().getName() + "] " + "excute sqls : " + sqls); }} /** * PreparedStatementSetter * @author yaolin */public interface PreparedStatementSetter { public void setter(PreparedStatement ps);} /** * ResultSetMapper * @author yaolin */public interface ResultSetMapper { public Object mapper(ResultSet rs);} Source code download: demo
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.