Preface
Synchronization within a single JVM is easy to deal with. Just use the lock provided by JDK directly, but cross-process synchronization is definitely impossible. In this case, you must rely on a third party. I use Redis here, and of course there are many other implementation methods. In fact, the principle based on Redis implementation is quite simple. Before reading the code, it is recommended that you check the principle first. After reading the code, it should be easier to understand.
I do not implement the JDK's java.util.concurrent.locks.Lock interface, but customize one, because there is a newCondition method in JDK. I have not implemented it for the time being. This Lock provides 5 variants of lock methods. You can choose which one to use to obtain the lock. My idea is that it is best to use the methods with timeout return. Because if this is not the case, if redis is hung, the thread will always be in the dead loop (about this, it should be further optimized. If redis is hung, Jedis's operation will definitely throw exceptions and so on. You can define a mechanism to notify users who use this lock when redis is hung, or threads)
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;public interface Lock { /** * Blocking acquisition lock, not responding to interrupt*/ void lock; /** * Blocking acquisition lock, not responding to interrupt* * @throws InterruptedException */ void lockInterruptibly throws InterruptedException; /** * Try to acquire the lock, return immediately without obtaining it, not blocking*/ boolean tryLock; /** * The blocking acquisition lock automatically returned by timeout, not responding to interrupt* * @param time * @param unit * @return {@code true} If the lock is successfully acquired, {@code false} If the lock is not retrieved within the specified time* */ boolean tryLock(long time, TimeUnit unit); /** * The blocking acquisition lock automatically returned by timeout, response interrupt* * @param time * @param unit * @return {@code true} If the lock is successfully acquired, {@code false} If the lock is not retrieved within the specified time* @throws InterruptedException The current thread trying to acquire the lock is interrupted*/ boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException; /** * Release lock*/ void unlock; }Look at its abstract implementation:
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;/** * The skeleton implementation of the lock, the real steps to acquire the lock are implemented by subclasses. * * @author lixiaohui * */public abstract class AbstractLock implements Lock { /** * <pre> * Whether visibility needs to be guaranteed here is worth discussing, because it is a distributed lock, * 1. It is also possible for multiple threads of the same jvm to use different lock objects, and in this case there is no need to guarantee visibility* 2. Multiple threads of the same jvm to use the same lock object, then visibility must be guaranteed. * </pre> */ protected volatile boolean locked; /** * The thread currently holding the lock in jvm (if have one) */ private Thread exclusiveOwnerThread; public void lock { try { lock(false, 0, null, false); } catch (InterruptedException e) { // TODO ignore } } public void lockInterruptibly throws InterruptedException { lock(false, 0, null, true); } public boolean tryLock(long time, TimeUnit unit) { try { return lock(true, time, unit, false); } catch (InterruptedException e) { // TODO ignore } return false; } public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException { return lock(true, time, unit, true); } public void unlock { // TODO Check whether the current thread holds the lock if (Thread.currentThread != getExclusiveOwnerThread) { throw new IllegalMonitorStateException("current thread does not hold the lock"); } unlock0; setExclusiveOwnerThread(null); } protected void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread { return exclusiveOwnerThread; } protected abstract void unlock0; /** * Implementation of blocking acquisition lock* * @param useTimeout * @param time * @param unit * @param interrupt Whether to respond to interrupts* @return * @throws InterruptedException */ protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;} Based on the final implementation of Redis, the key code for acquiring and releasing the lock is in the lock method and unlock0 method of this class. You can only look at these two methods and write one completely by yourself:
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis;/** * <pre> * Distributed lock implemented by SETNX operation based on Redis* * It is best to use lock(long time, TimeUnit unit) when acquiring locks to avoid network problems causing threads to block all the time* * <a href="http://redis.io/commands/setnx">SETNC operation reference</a> * </pre> * * @author lixiaohui * */public class RedisBasedDistributedLock extends AbstractLock { private Jedis jedis; // The name of the lock protected String lockKey; // The validity duration of the lock (ms) protected long lockExpires; public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) { this.jedis = jedis; this.lockKey = lockKey; this.lockExpires = lockExpires; } // Implementation of blocking acquisition lock protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{ if (interrupt) { checkInterruption; } long start = System.currentTimeMillis; long timeout = unit.toMillis(time); // if !useTimeout, then it's useless while (useTimeout ? isTimeout(start, timeout) : true) { if (interrupt) { checkInterruption; } long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//lock timeout String stringOfLockExpireTime = String.valueOf(lockExpireTime); if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // Obtained lock// TODO successfully obtained the lock, set the relevant identifier locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // Assume that multiple threads (non-single jvm) come here at the same time String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // But the oldValue obtained by each thread when it comes here is definitely impossible to be the same (because getset is atomic) // The oldValue obtained by joining is still expired, then it means that the lock is obtained if (oldValue != null && isTimeExpired(oldValue)) { // TODO successfully obtains the lock, set the relevant identifier locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } } else { // TODO lock is not expired, enter next loop retrying } } return false; } public boolean tryLock { long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//Lock timeout time String stringOfLockExpireTime = String.valueOf(lockExpireTime); if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // Obtain the lock// TODO successfully acquires the lock, set the relevant identifier locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // Assume multiple threads (not single jvm) come here at the same time String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // But the oldValue obtained by each thread when it comes here is definitely impossible (because getset is atomic) // If the oldValue you get is still expired, then it means you have got the lock if (oldValue != null && isTimeExpired(oldValue)) { // TODO successfully obtained the lock, set the relevant identifier locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } } else { // TODO lock is not expired, enter next loop retrying } return false; } /** * Queries if this lock is held by any thread. * * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ public boolean isLocked { if (locked) { return true; } else { String value = jedis.get(lockKey); // TODO There is actually a problem here. Think: When the get method returns the value, assume that the value has expired, // At this moment, another node sets the value, and the lock is held by another thread (the node holds), and the next judgment // cannot detect this situation. However, this problem should not cause other problems, because the purpose of this method is // is not synchronous control, it is just a report of the lock status. return !isTimeExpired(value); } } @Override protected void unlock0 { // TODO determines whether the lock expires String value = jedis.get(lockKey); if (!isTimeExpired(value)) { doUnlock; } } private void checkInterruption throws InterruptedException { if(Thread.currentThread.isInterrupted) { throw new InterruptedException; } } private boolean isTimeExpired(String value) { return Long.parseLong(value) < System.currentTimeMillis; } private boolean isTimeout(long start, long timeout) { return start + timeout > System.currentTimeMillis; } private void doUnlock { jedis.del(lockKey); }} If you change the implementation method in the future (such as zookeeper , etc.), then you can directly inherit AbstractLock and implement l ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) and unlock0 method (so-called abstraction)
test
Simulate the global ID grower and design an IDGenerator class. This class is responsible for generating global incremental IDs. Its code is as follows:
package cc.lixiaohui.lock;import java.math.BigInteger;import java.util.concurrent.TimeUnit;/** * Simulate ID generation* @author lixiaohui * */public class IDGenerator { private static BigInteger id = BigInteger.valueOf(0); private final Lock lock; private static final BigInteger INCREMENT = BigInteger.valueOf(1); public IDGenerator(Lock lock) { this.lock = lock; } public String getAndIncrement { if (lock.tryLock(3, TimeUnit.SECONDS)) { try { // TODO Get the lock here and access the critical area resource return getAndIncrement0; } finally { lock.unlock; } } return null; //return getAndIncrement0; } private String getAndIncrement0 { String s = id.toString; id = id.add(INCREMENT); return s; }} Test main logic: open two threads in the same JVM to loop deadly (there is no interval between loops, if there is one, the test will be meaningless) to get ID (I am not a dead loop but run for 20s), get the ID and store it in the same Set . Before it is stored, check whether the ID exists in set . If it already exists, let both threads stop. If the program can run 20 seconds normally, it means that this distributed lock can meet the requirements. The effect of such a test should be the same as that of different JVMs (that is, in a real distributed environment). The following is the code of the test class:
package cc.lixiaohui.DistributedLock.DistributedLock;import java.util.HashSet;import java.util.Set;import org.junit.Test;import redis.clients.jedis.Jedis;import cc.lixiaohui.lock.IDGenerator;import cc.lixiaohui.lock.Lock;import cc.lixiaohui.lock.RedisBasedDistributedLock;public class IDGeneratorTest { private static Set<String> generatedIds = new HashSet<String>; private static final String LOCK_KEY = "lock.lock"; private static final long LOCK_EXPIRE = 5 * 1000; @Test public void test throws InterruptedException { Jedis jedis1 = new Jedis("localhost", 6379); Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE); IDGenerator g1 = new IDGenerator(lock1); IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1"); Jedis jedis2 = new Jedis("localhost", 6379); Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE); IDGenerator g2 = new IDGenerator(lock2); IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2"); Thread t1 = new Thread(consume1); Thread t2 = new Thread(consume2); t1.start; t2.start; Thread.sleep(20 * 1000); //Let two threads run for 20 seconds IDConsumeMission.stop; t1.join; t2.join; } static String time { return String.valueOf(System.currentTimeMillis / 1000); } static class IDConsumeMission implements Runnable { private IDGenerator idGenerator; private String name; private static volatile boolean stop; public IDConsumeMission(IDGenerator idGenerator, String name) { this.idGenerator = idGenerator; this.name = name; } public static void stop { stop = true; } public void run { System.out.println(time + ": consume " + name + " start "); while (!stop) { String id = idGenerator.getAndIncrement; if(generatedIds.contains(id)) { System.out.println(time + ": duplicate id generated, id = " + id); stop = true; continue; } generatedIds.add(id); System.out.println(time + ": consume " + name + " add id = " + id); } System.out.println(time + ": consume " + name + " done "); } } }To be clear, the way I stops two threads here is not very good. I did this for convenience, because it is just a test, so it is best not to do this.
Test results
There are too many things printed in 20s. The ones printed in the front are clear and only available when the run is almost finished. The screenshot below. This shows that this lock works normally:
When IDGererator is not locked (that is, IDGererator 's getAndIncrement method does not lock it when it obtains id internally), the test will not pass, and there is a very high probability that it will stop midway. The following are the test results when the lock is not locked:
This takes less than 1 second:
This one takes less than 1 second:
Conclusion
OK, the above is all about Java implementing distributed locks based on Redis. If you find any problems, you hope to correct them. I hope this article can help you study and work. If you have any questions, you can leave a message to communicate.