What is Memcache?
Cache solution in Memcache cluster environment
Memcache is a high-performance distributed memory object cache system. By maintaining a unified and huge hash table in memory, it can be used to store data in various formats, including images, videos, files, and database retrieval results. Simply put, it is to call the data into memory and then read it from memory, thereby greatly improving the reading speed.
Memcache is a project in Danga. It was first served by LiveJournal. It was originally developed to speed up the access speed of LiveJournal. It was later adopted by many large websites.
Memcached runs on one or more servers in a daemon mode, and receives client connections and operations at any time.
Why are there two names Memcache and memcached?
In fact, Memcache is the name of this project, and memcached is the main program file name on its server side. You know what I mean. One is the project name and the other is the main program file name. I saw it online that many people don’t understand it, so I mixed it.
Memcached is a high-performance, distributed memory object caching system used to reduce database load and improve access speed in dynamic applications. Memcached is developed by Danga Interactive to improve access to LiveJournal.com. LJ has thousands of dynamic page visits per second and has 7 million users. Memcached greatly reduces the database load, better allocates resources and faster access.
This article will cover the following:
Memcache
Memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, API calls, or page rendering.
That is, the in-memory cache database, which is a key-value pair database. The database exists to temporarily store the data obtained from other services in memory and can be returned directly from the hit cache when repeated accesses. This not only speeds up access rates, but also reduces the load on other services. Here, a single server version of Memcache will be implemented and supports simultaneous connections between multiple clients.
The client will establish a telnet connection with the server and then interact with the server cache according to the Memcache protocol. The instructions implemented here are get, set and del. Let's take a look at the format of each instruction
set
set is a storage instruction. When storing the characteristics of the instruction, input basic information in the first line and input its corresponding value in the second line.
set <key> <flags> <exptime> <bytes> [noreply]/r/n
<value>/r/n
If the storage is successful, STORED will be returned, and if the directive contains the noreply attribute, the server will not return the information.
The contents of each domain in this directive are as follows:
If the instruction does not meet the criteria, the server will return ERROR.
get
get is a get command, and the characteristics of this command are as follows:
get <key>*/r/n
It supports passing values of multiple keys. If the cache hits one or more keys, the corresponding data will be returned and ends with END. If no hit is made, the returned message does not contain the value corresponding to the key. The format is as follows:
VALUE <key> <flags> <bytes>/r/n<data block>/r/nVALUE <key> <flags> <bytes>/r/n<data block>/r/nENDdel
Delete the command, the format of the command is as follows:
del <key> [noreply]/r/n
If the deletion is successful, it will return DELETED/r/n, otherwise it will return NOT_FOUND. If there is a noreply parameter, the server will not return a response.
JAVA SOCKET
All JAVA SOCKET needs to know is TCP protocol, sockets, and IO streams. I won't go into details here. You can refer to my series of articles. It is also recommended to read the JAVA Network Programming. A book.
Code implementation
There is something wrong with the map function here. You can go to my project address at the end of the article to view the class diagram.
Here, instruction mode and factory mode are used to implement decoupling of instruction creation and execution. The command factory will receive the commandLine and return a Command instance. Each Command has an execute method to perform its own unique operations. Only the special implementation of the del instruction is posted here.
/** * Various directives* Currently supports get, set, delete * * and custom * error, end */public interface Command { /** * Execute command* @param reader * @param writer */ void execute(Reader reader, Writer writer); /** * Get the type of the command* @return */ CommandType getType();} /** * Single instance of directive factory*/public class CommandFactory { private static CommandFactory commandFactory; private static Cache<Item> memcache; private CommandFactory(){} public static CommandFactory getInstance(Cache<Item> cache) { if (commandFactory == null) { commandFactory = new CommandFactory(); memcache = cache; } return commandFactory; } /** * Get Command according to the type of the directive * @param commandLine * @return */ public Command getCommand(String commandLine){ if (commandLine.matches("^set .*$")){ return new SetCommand(commandLine, memcache); }else if (commandLine.matches("^get .*$")){ return new GetCommand(commandLine, memcache); }else if (commandLine.matches("^del .*$")){ return new DeleteCommand(commandLine, memcache); }else if (commandLine.matches("^end$")){ return new EndCommand(commandLine); }else{ return new ErrorCommand(commandLine, ErrorCommand.ErrorType.ERROR); } }} /** * Delete cache directive*/public class DeleteCommand implements Command{ private final String command; private final Cache<Item> cache; private String key; private boolean noReply; public DeleteCommand(final String command, final Cache<Item> cache){ this.command = command; this.cache = cache; initCommand(); } private void initCommand(){ if (this.command.contains("noreply")){ noReply = true; } String[] info = command.split(" "); key = info[1]; } @Override public void execute(Reader reader, Writer writer) { BufferedWriter bfw = (BufferedWriter) writer; Item item = cache.delete(key); if (!noReply){ try { if (item == null){ bfw.write("NOT_FOUND/r/n"); }else { bfw.write("DELETED/r/n"); } bfw.flush(); } catch (IOException e) { try { bfw.write("ERROR/r/n"); bfw.flush(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } } } @Override public CommandType getType() { return CommandType.SEARCH; }}Then, implement the memory server. In order to support the first-in-first-out function, LinkedTreeMap is used as the underlying implementation, and the removeOldest method is rewritten. At the same time, the CacheManager background thread is also used to clear expired cache entries in time.
public class Memcache implements Cache<Item>{ private Logger logger = Logger.getLogger(Memcache.class.getName()); //Use LinkedHashMap to implement LRU private static LinkedHashMap<String, Item> cache; private final int maxSize; //Load factor private final float DEFAULT_LOAD_FACTOR = 0.75f; public Memcache(final int maxSize){ this.maxSize = maxSize; //Make sure cache will not automatically expand after maxSize is reached int capacity = (int) Math.ceil(maxSize / DEFAULT_LOAD_FACTOR) + 1; this.cache = new LinkedHashMap<String, Item>(capacity, DEFAULT_LOAD_FACTOR, true){ @Override protected boolean removeEldestEntry(Map.Entry<String,Item>eldest) { if (size() > maxSize){ logger.info("The cache number has reached the upper limit, and the least recently used entry will be deleted"); } return size() > maxSize; } }; //Implement synchronized access Collections.synchronizedMap(cache); } public synchronized boolean isFull(){ return cache.size() >= maxSize; } @Override public Item get(String key) { Item item = cache.get(key); if (item == null){ logger.info("Key in cache:" + key + "Not exist"); return null; }else if(item!=null && item.isExpired()){ //If the cache expires, delete and return null logger.info("Read key from cache:" + key + " value:" + item.getValue() + "has expired"); cache.remove(key); return null; } logger.info("Read key from cache:" + key + " value:" + item.getValue() + " remaining valid time" + item.remainTime()); return item; } @Override public void set(String key, Item value) { logger.info("Write key to cache:" + key + " value:" + value); cache.put(key, value); } @Override public Item delete(String key) { logger.info("Delete key from cache:" + key); return cache.remove(key); } @Override public int size(){ return cache.size(); } @Override public int capacity() { return maxSize; } @Override public Iterator<Map.Entry<String, Item>> iterator() { return cache.entrySet().iterator(); }} /** * Cache Manager* Background thread* Delete expired cache in cache*/public class CacheManager implements Runnable { private Logger logger = Logger.getLogger(CacheManager.class.getName()); //Cache public Cache<Item> cache; public CacheManager(Cache<Item> cache){ this.cache = cache; } @Override public void run() { while (true){ Iterator<Map.Entry<String, Item>> itemIterator = cache.iterator(); while (itemIterator.hasNext()){ Map.Entry<String, Item> entry = itemIterator.next(); Item item = entry.getValue(); if(item.isExpired()){ logger.info("key:" + entry.getKey() + "value" + item.getValue() + " Expired, deleted from the database"); itemIterator.remove(); } } try { // Run the background program every 5 seconds TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } }}Finally, implement a multi-threaded Socket server, where the ServerSocket is bound to an interface and hand over the accepted Socket to an additional thread for processing.
/** * Server*/public class IOServer implements Server { private boolean stop; //port number private final int port; //server thread private ServerSocket serverSocket; private final Logger logger = Logger.getLogger(IOServer.class.getName()); //Thread pool, thread capacity is maxConnection private final ExecutorService executorService; private final Cache<Item> cache; public IOServer(int port, int maxConnection, Cache<Item> cache){ if (maxConnection<=0) throw new IllegalArgumentException("The maximum number of supported connections must be positive integer"); this.port = port; executorService = Executors.newFixedThreadPool(maxConnection); this.cache = cache; } @Override public void start() { try { serverSocket = new ServerSocket(port); logger.info("The server starts on port"+port+"); while (true){ try { Socket socket = serverSocket.accept(); logger.info("Received a connection of "+socket.getLocalAddress()+""); executorService.submit(new SocketHandler(socket, cache)); } catch (IOException e) { e.printStackTrace(); } } } catch (IOException e) { logger.log(Level.WARNING, "The server is about to be shut down..."); e.printStackTrace(); } finally { executorService.shutdown(); shutDown(); } } /** * Is the server still running* @return */ public boolean isRunning() { return !serverSocket.isClosed(); } /** * Stop the server*/ public void shutDown(){ try { if (serverSocket!=null){ serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } }} /** * Handle the connections of each client* Close the connection after obtaining the end instruction s */public class SocketHandler implements Runnable{ private static Logger logger = Logger.getLogger(SocketHandler.class.getName()); private final Socket socket; private final Cache<Item> cache; private boolean finish; public SocketHandler(Socket s, Cache<Item> cache){ this.socket = s; this.cache = cache; } @Override public void run() { try { //Get socket input stream final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //Get socket output stream final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); CommandFactory commandFactory = CommandFactory.getInstance(cache); while (!finish){ final String commandLine = reader.readLine(); logger.info("ip:" + socket.getLocalAddress() + " directive:" + commandLine); if (commandLine == null || commandLine.trim().isEmpty()) { continue; } //Use the command factory to get the command instance final Command command = commandFactory.getCommand(commandLine); command.execute(reader, writer); if (command.getType() == CommandType.END){ logger.info("Request to close the connection"); finish = true; } } } catch (IOException e) { e.printStackTrace(); logger.info("Close the connection from" + socket.getLocalAddress() + ""); } finally { try { if (socket != null){ socket.close(); } } catch (IOException e) { e.printStackTrace(); } } }}Please click here for the project address. If you think it is pretty good, I hope you can give me a star.
References
memcached official website
Memcache protocol
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.