Предисловие
Синхронизация в одном JVM легко справиться. Просто используйте блокировку, предоставленную JDK напрямую, но синхронизация кросс-обработки определенно невозможна. В этом случае вы должны полагаться на третью сторону. Я использую Redis здесь, и, конечно, есть много других методов реализации. На самом деле, принцип, основанный на реализации Redis, довольно прост. Перед чтением кода рекомендуется сначала проверить принцип. После прочтения кода это должно быть проще для понимания.
Я не внедряю интерфейс JDK java.util.concurrent.locks.Lock , но настраиваю один, потому что в JDK есть метод newCondition . Я не реализовал это на данный момент. Этот блокировка предоставляет 5 вариантов методов блокировки. Вы можете выбрать, какой из них использовать для получения блокировки. Моя идея заключается в том, что лучше всего использовать методы с возвратом тайм -аута. Потому что, если это не так, если Redis повешен, поток всегда будет в мертвой цикле (об этом, это должно быть еще более оптимизировано. Если Redis повешен, операция Jedis определенно сделает исключения и так далее. Вы можете определить механизм, чтобы уведомить пользователей, которые используют этот замок, когда Redis повесится, или потоки).
Пакет cc.lixiaohui.lock; import java.util.concurrent.timeUnit; блокировка публичного интерфейса { / *** Блокировка блокировки приобретения, не отвечая на прерывание* / void Lock; / ** * Блокировка блокировки сбора, не отвечая на прерывание * * @Throws прерывание / *** Попробуйте приобрести блокировку, немедленно верните, не получая его, не блокируя*/ boolean trylock; / ** * Блокировка сбора блокировки автоматически возвращается тайм -аутом, не отвечая на прерывание * * @param Time * @param Unit * @return {@code true} Если блокировка успешно получена, {@code false}, если блокировка не получена в течение указанного времени * */ boolean trylock (давнее время, время Unit); / *** Блокировка сбора блокировки, автоматически возвращаемое Timeout, прерывание ответа** @param Time* @param Unit* @return {@code true} Если блокировка успешно получена, {@code false} Если блокировка не получена в указанное время* @Throws RepruptExceptex Прерванная экспрессия; / *** Выпустить блокировку*/ void разблокировать; }Посмотрите на его абстрактную реализацию:
Пакет cc.lixiaohui.lock; import java.util.concurrent.timeUnit;/*** Реализация скелета блокировки, реальные шаги по приобретению блокировки реализованы подклассами. * * @author lixiaohui * * /public абстрактного класса Abstractlock реализует Lock { /** * <pre> *, стоит ли здесь гарантировать видимость, потому что это распределенная блокировка, * 1. Также для нескольких потоков одного и того же JVM используется различные объекты блокировки, и в этом случае не требуется, чтобы он не был гарантировать, что и тот же jvm использует то же самое, что и то же, и то же, и то же самое можно использовать, а затем и то же самую то же самое, что и то же самое, и то же самое можно использовать, и то же самое, что и то же самое можно использовать, и то же самое, что и то же самое можно использовать, и то же самое, что и то же самое можно использовать. гарантированно. * </ pre> */ защищенная летучая логическая заблокированная; / ** * Поток в настоящее время удерживает блокировку в jvm (если есть один) */ частная потока ExclusiveWartHread; public void lock {try {lock (false, 0, null, false); } catch (прерывание Exception e) {// todo игнорировать}} public void lockErrastress trows прерывание {lock (false, 0, null, true); } public boolean trylock (долгое время, Unit Unit) {try {return Lock (True, Time, Unit, false); } catch (прерывание Exception e) {// todo игнорировать} вернуть false; } public boolean trylockErraintaill (давно, время, блок TimeUnit), бросает прерванную экспрессию {return lock (true, time, int, true); } public void разблокировка {// toDo проверяет, удерживает ли текущий поток блокировку if (thread.currentThread! } unlock0; SetExclusiveOwnterThread (null); } Защищенный void setExclusiveWareThread (поток потока) {exclusiveOwnerThread = Thread; } защищенный окончательный поток getExclusioushownerthread {return exclusiveOwnerThread; } защищенный абстрактный void unlock0; / ** * Реализация блокировки блокировки приобретения * * * @param usetimeout * @param time * @param Unit * @param прерывать, отвечать ли прерывания * @return * @throws retrupteDexception */ Защищенное абстрактное логическое блокировка (Boolean useTime, долгое время, временная единица, логическая межпреда) TurruptedException; Основываясь на окончательной реализации Redis, код ключа для получения и выпуска блокировки находится в методе lock и метода unlock0 этого класса. Вы можете взглянуть только на эти два метода и написать его полностью самостоятельно:
Пакет cc.lixiaohui.lock; import java.util.concurrent.timeUnit; import redis.clients.jedis.jedis;/** * <pre> * Распределенная блокировка, реализованная с помощью операции SetNx на основе Redis * * * Лучше всего использовать блокировку (долгое время, Unit), когда приобретают блокировки, чтобы избежать сетевых проблем, чтобы заблокировать все время. href = "http://redis.io/commands/setnx"> setnc repormers chourt </a> * </pre> * * @author lixiaohui * */public class RedIsBasedDistributedlock Extrablock Abstractlock {private jedis jedis; // Имя блокировки защищенной строки LockKey; // продолжительность достоверности блокировки (MS), защищенная длинными локкексами; public redisbaseddistributedlock (jedis jedis, String Lockkey, Long Lockexpires) {this.jedis = jedis; this.lockkey = lockkey; this.lockexpires = lockexpires; } // Реализация блокировки блокировки сбора защищенного логического блокировки (Boolean useTimeout, долгое время, единица времени, логическое прерывание), бросает прерывания. } long start = System.currentTimeMillis; Длинный тайм -аут = unit.tomillis (время); // if! usetimeout, тогда это бесполезно, пока (useTimeout? } long lockexpireTime = System.currentTimeMillis + lockexpires + 1; // Замок Timeout String stringOfLocKexPireTime = String.ValueOF (lockexPireTime); if (jedis.setnx (lockkey, stringoflockexpiretime) == 1) {// Полученная блокировка // todo успешно получил блокировку, установите соответствующий идентификатор locked = true; SetExclusiveOwnterThread (Thread.CurrentThread); вернуть истину; } String value = jedis.get (lockkey); if (value! = null && istimeexpired (value)) {// lock истекает // Предположим, что несколько потоков (не-низи с jvm) приходят сюда в одну и ту же строку oldvalue = jedis.getset (lockkey, stringoflockexpiretime); // getset is atomic // But the oldValue obtained by each thread when it comes here is definitely impossible to be the same (because getset is atomic) // The oldValue obtained by joining is still expired, then it means that the lock is obtained if (oldValue != null && isTimeExpired(oldValue)) { // TODO successfully obtains the lock, set the relevant identifier locked = true; SetExclusiveOwnterThread (Thread.CurrentThread); вернуть истину; }} else {// todo lock не истекает, введите следующий цикл повторения}} return false; } public boolean trylock {long lockexpiretime = System.currentTimeMillis + lockexpires + 1; // Замок временной интервал string stringOflockexpireTime = string.valueof (lockexPireTime); if (jedis.setnx (lockkey, stringoflockexpiretime) == 1) {// получить блокировку // todo успешно получает блокировку, установите соответствующий идентификатор locked = true; SetExclusiveOwnterThread (Thread.CurrentThread); вернуть истину; } String value = jedis.get (lockkey); if (value! = null && istimeexpired (value)) {// Замок истек // Предположим, что несколько потоков (не одно JVM). // getSet является атомным //, но старого, полученное каждым потоком, когда он приходит сюда, определенно невозможно (потому что getSet является атомным) // Если старое, которое вы получаете, все еще истек, то это означает, что у вас есть блокировка, если (OldValue! = NULL && IStimeExpied (OldValue)) {// toDo успешно достигнуто заблокирован, идентифицированный; SetExclusiveOwnterThread (Thread.CurrentThread); вернуть истину; }} else {// todo lock не истекает, введите следующий цикл повторения} вернуть false; } /*** Запросы, если эта блокировка удерживается любым потоком. * * @return {@code true}, если какой -либо поток удерживает эту блокировку и * {@code false} иначе */ public boolean islocked {if (locked) {return true; } else {string value = jedis.get (lockkey); // Тодо Здесь есть проблема. Подумайте: когда метод GET возвращает значение, предположим, что значение истек срок действия, // в данный момент, другой узел устанавливает значение, и блокировка удерживается другим потоком (узел узел), а следующее решение // не может обнаружить эту ситуацию. Тем не менее, эта проблема не должна вызывать другие проблемы, поскольку целью этого метода является // не синхронное управление, это просто отчет о статусе блокировки. возврат! Istiexpired (значение); }} @Override Protected void unlock0 {// todo определяет, истекает ли блокировка string value = jedis.get (lockkey); if (! Istiexpired (value)) {Dounlock; }} private void checketterruption бросы прерывания {if (thread.currentThread.isErenterted) {Throw New SurpruptEdException; }} private boolean istimeexpired (string value) {return long.parselong (value) <system.currenttimemillis; } private boolean istimeout (Long Start, Long Timeout) {return Start + Timeout> System.currentTimeMillis; } private void dounlock {jedis.del (lockkey); }} Если вы измените метод реализации в будущем (например, zookeeper и т. Д.), То вы можете напрямую наследить AbstractLock и реализовать L ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) и метод unlock0 (так называемая абстракция)
тест
Моделируйте Global Id Grower и разработайте класс IDGenerator . Этот класс отвечает за генерацию глобальных инкрементных идентификаторов. Его код заключается в следующем:
Пакет cc.lixiaohui.lock; import java.math.biginteger; import java.util.concurrent.timeUnit;/** * Совместное использование идентификационного поколения * @author lixiaohui * */public class idgenerator {private static biginteger id = biginteger.valueof (0); Частный финальный замок замок; Частный статический финальный BigInteger Increment = BigInteger.valueof (1); public idGenerator (блокировка блокировки) {this.lock = lock; } public String getAndIncrement {if (lock.trylock (3, timeUnit.seconds)) {try {// todo получить блокировку здесь и получить доступ к ресурсу критической области return getandIncrement0; } наконец {lock.unlock; }} return null; // return getAndIncrement0; } частная строка getAndIncrement0 {string s = id.toString; id = id.add (urment); возврат S; }} Тестирование главной логики: две потоки открываются в одном и том же JVM в мертвой петле (интервал между петель нет, если есть, тест будет бессмысленным), чтобы получить ID (я не мертвый цикл, но бегаю в течение 20 с), получить идентификатор и хранить его в том же Set . Прежде чем он будет сохранен, проверьте, существует ли ID в set . Если это уже существует, пусть оба потока остановится. Если программа может работать на 20 секунд обычно, это означает, что этот распределенный блокировка может соответствовать требованиям. Эффект такого теста должен быть таким же, как и у различных JVM (то есть в реальной распределенной среде). Ниже приведен код тестового класса:
Пакет cc.lixiaohui.distributedlock.distributedlock; import java.util.hashset; import java.util.set; import org.junit.test; import redis.clients.jedis.jedis; импорт cc.lixiohui.lock.idenerator; import Cc.lixohui.lock.lock; cc.lixiaohui.lock.redisbaseddistributedlock; public class idgeneratortest {private static set <string> Generatedids = new Hashset <string>; частная статическая конечная строка lock_key = "lock.lock"; Частный статический окончательный long lock_expire = 5 * 1000; @Test public void test test test treamptedException {jedis jedis1 = new jedis ("localhost", 6379); Lock Lock1 = new RedisBasedDistributedLock (jedis1, lock_key, lock_expire); IdGenerator G1 = новый IdGenerator (Lock1); IdConsumemission Usume1 = новая iDconsumemission (g1, "uncome1"); Jedis jedis2 = new jedis ("Localhost", 6379); Lock Lock2 = new RedisBasedDistributedlock (jedis2, lock_key, lock_expire); IdGenerator G2 = новый IdGenerator (Lock2); Idconsumemission unsume2 = new idconsumemission (g2, "unsume2"); Потока T1 = новый поток (потребление1); Потока T2 = новый поток (потребление2); T1.Start; T2.Start; Thread.sleep (20 * 1000); // Пусть два потока запускаются в течение 20 секунд idconsumemission.stop; T1.join; t2.join; } static String Time {return string.valueof (System.currentTimeMillis / 1000); } статический класс iDconsumemission реализует runnable {private idGenerator idGenerator; Приватное название строки; Частная статическая летучая логическая остановка; public idconsumemission (idgenerator idgenerator, string name) {this.idgenerator = idGenerator; this.name = name; } public static void Stop {stop = true; } public void run {System.out.println (время + ": потребление" + name + "start"); while (! Stop) {String id = idGenerator.getAndIncrement; if (GeneratedIds.contains (id)) {System.out.println (Time + ": Duplicate ID, сгенерированный, id =" + id); Stop = true; продолжать; } genedIds.add (id); System.out.println (время + ": потребление" + name + "add id =" + id); } System.out.println (время + ": потребление" + name + "dode"); }}}Чтобы быть ясным, то, как я останавливаю две темы здесь, не очень хорошо. Я сделал это для удобства, потому что это просто тест, поэтому лучше не делать этого.
Результаты теста
Слишком много вещей, напечатанных в 20 -х годах. Те, которые напечатаны спереди, clear и доступны только после того, как пробег почти закончен. Скриншот ниже. Это показывает, что эта блокировка работает нормально:
Когда IDGererator не заблокирован (то есть метод getAndIncrement IDGererator не блокирует его, когда он получает id внутри внутреннего времени), тест не пройдет, и существует очень высокая вероятность, что он остановится на полпути. Ниже приведены результаты теста, когда блокировка не заблокирован:
Это занимает менее 1 секунды:
Это занимает менее 1 секунды:
Заключение
ОК, вышеупомянутое о Java реализует распределенные замки на основе Redis. Если вы найдете какие -либо проблемы, вы надеетесь исправить их. Я надеюсь, что эта статья поможет вам учиться и работать. Если у вас есть какие -либо вопросы, вы можете оставить сообщение для общения.