Vorwort
Die Synchronisation innerhalb eines einzelnen JVM ist leicht zu handhaben. Verwenden Sie einfach das von JDK bereitgestellte Schloss direkt, aber die Cross-Process-Synchronisation ist definitiv unmöglich. In diesem Fall müssen Sie sich auf einen Dritten verlassen. Ich verwende Redis hier und natürlich gibt es viele andere Implementierungsmethoden. Tatsächlich ist das auf Redis -Implementierung basierende Prinzip recht einfach. Vor dem Lesen des Code wird empfohlen, zuerst das Prinzip zu überprüfen. Nach dem Lesen des Code sollte es einfacher zu verstehen sein.
Ich implementiere nicht die java.util.concurrent.locks.Lock -Schnittstelle von JDK, sondern passt eine an, da es in JDK eine newCondition -Methode gibt. Ich habe es vorerst nicht implementiert. Dieses Schloss liefert 5 Varianten von Sperrmethoden. Sie können auswählen, welche Sie zum Erhalten des Schlosses verwendet werden sollen. Meine Idee ist, dass es am besten ist, die Methoden mit Timeout -Return zu verwenden. Denn wenn dies nicht der Fall ist, wenn Redis aufgehängt wird, befindet sich der Thread immer in der toten Schleife (darüber sollte er weiter optimiert werden. Wenn Redis aufgehängt wird, wird Jedis 'Operation definitiv Ausnahmen und so weiterwerfen.
Paket cc.lixiaohui.lock; import java.util.concurrent / ** * Blockierungserfassungsperre, nicht auf Interrupt reagieren / *** Versuchen Sie, das Schloss zu erwerben, kehren Sie sofort zurück, ohne es zu erhalten, nicht blockieren*/ boolean trylock; / ** * Die Blockierungs -Erfassungsschloss wurde automatisch nach Timeout zurückgegeben, nicht auf Interrupt * * @param time * @param einheit * @return {@Code true} Wenn die Sperre erfolgreich erfasst wird, {@code false} Wenn die Sperre nicht in der angegebenen Zeit * */ boolean trylock (Long TimeUnit Unit) abgerufen wird; / *** Die Blockierungs -Erfassungsschloss, die automatisch nach Timeout zurückgegeben wurde. InterruptedException; / *** Freisetzende Schloss*/ void entsperr; }Schauen Sie sich die abstrakte Implementierung an:
Paket cc.lixiaohui.lock; import java.util.concurrent * * @Author lixiaohui * * /public abstract class AbstractLock implementiert das Lock { /** * <pre> * Ob die Sichtbarkeit garantiert werden muss, da es sich um ein verteiltes Sperre handelt. muss garantiert werden. * </ pre> */ geschützte flüchtige boolean gesperrt; / ** * Der Thread hält derzeit das Sperre in JVM (falls eins) */ privater Thread ExclusiveWerThread; public void lock {try {lock (false, 0, null, false); } catch (InterruptedException e) {// Todo ignorieren}} public void lockEnterruptisle löst InterruptedException {lock (false, 0, null, true) aus; } public boolean trylock (lange Zeit, Zeiteinheit) {try {return lock (true, time, uneinheit, false); } catch (InterruptedException e) {// todo ignorore} return false; } public boolean trylockinterriptable (lange Zeit, Timeunit Unit) löst unterbrochene Ausnahme {return lock (true, time, event, true); } public void entsperr {// toDo prüfen Sie, ob der aktuelle Thread die Sperre enthält, wenn (thread.currentThread! } Unlock0; setExclusiveCernerThread (NULL); } Protected void setExclusiveOnerThread (Thread -Thread) {exklusivOwnerThread = thread; } Protected Final Thread getExclusiveCernerThread {return exclusiveWerThread; } geschützte abstrakte void Unlock0; / ** * Implementierung von Blockierungserfassungsschloss * * @param usetimeout * @param time * @param unit * @param interrupt, ob auf Interrupts * @Return * @throws InterruptedException */ geschütztes abstraktes Boolean Lock (boolean usetimeout, langjährige, timeUnit unit, boolean Interruwt) reagiert werden soll. Basierend auf der endgültigen Implementierung von Redis befindet sich der Schlüsselcode zum Erwerb und der Veröffentlichung der Sperre in der lock und unlock0 der Methode dieser Klasse. Sie können sich diese beiden Methoden nur ansehen und sich selbst komplett selbst schreiben:
Paket cc.lixiaohui.lock; import java.util.concurrent href = "http://redis.io/commands/setnx"> setNC Operation Referenz </a> * </pre> * @author lixiaohui * */public class wieder aufgeschüttetem DistributedLock erweitert abstractlock {private jedis jedis; // den Namen des sperren geschützten String -String -Lockkey; // Die Gültigkeitsdauer des Lock -Lockexpires (MS) geschützt; public recisbased diviTributedlock (Jedis jedis, String Lockkey, Long lockexpires) {this.jedis = jedis; this.lockKey = lockkey; this.lockexpires = lockexpires; } // Implementierung der blockierenden Erfassungsschloss geschützte Boolesche Schloss (boolean usetimeout, lange Zeit, Zeiteinheit, Boolean Interrupt) löst InterruptedException aus {if (interrupt) {{achteInterrupion; } long start = system.currentTimemillis; langfristig = Einheit.Tomillis (Zeit); // if! usetimeout, dann ist es nutzlos, während (usetimeout? } long lockexpiretime = system.currentTimemillis + lockKexpires + 1; // Timeout String -StringofocoCexpiretime = String.ValueOf (lockKexpiretime); if (jedis.setnx (lockkey, stringofoktexpiretime) == 1) {// erhalten lock // Todo die Sperre erfolgreich erhalten, legte die relevante Kennung gesperrt = true; setExclusiveWererThread (Thread.CurrentThread); zurückkehren; } String value = jedis.get (lockkey); if (value! // Getset ist atomisch // aber der alte Wert, der von jedem Thread erhalten wird, wenn er hierher kommt, ist definitiv unmöglich, dasselbe zu sein (weil Getset atomic ist) // Der alte Value, der durch den Beitritt gewonnen wird, ist immer noch abgelaufen. Dann bedeutet dies, dass das Schloss erhalten wird, wenn (OldValue! setExclusiveWererThread (Thread.CurrentThread); zurückkehren; }} else {// Todo -Sperre ist nicht abgelaufen. Geben Sie die nächste Loop -Wiederholung ein}} zurück. } public boolean trylock {long lockexpiretime = system.currentTimemillis + lockKexpires + 1; // Timeout -Zeit -String -StringOfoktexpiretime = String.ValueOf (lockexpiretime); if (jedis.setnx (lockkey, stringofoktexpiretime) == 1) {// Erwerben Sie die Sperre // Todo erfolgreich die relevante Kennung locked = true; setExclusiveWererThread (Thread.CurrentThread); zurückkehren; } String value = jedis.get (lockkey); if (value! // Getset ist atomisch // aber der alte Wert, der von jedem Thread erhalten wird, wenn er hier ist, ist definitiv unmöglich (weil Getset atomic ist) // Wenn der alte Wert, den Sie erhalten, immer noch abgelaufen ist, bedeutet dies, dass Sie das Schloss erhalten haben, wenn (oldValue! setExclusiveWererThread (Thread.CurrentThread); zurückkehren; }} else {// Todo -Sperre ist nicht abgelaufen, geben Sie die nächste Schleife erneut ein} zurück. } /*** Abfragen, wenn dieses Schloss von einem Thread gehalten wird. * * @return {@code true} Wenn ein Thread dieses Sperre hält und * {@code false} sonst */ public boolean isLocked {if (gesperrt) {return true; } else {String value = jedis.get (lockkey); // todo Es gibt hier tatsächlich ein Problem. Denken Sie an: Wenn die GET -Methode den Wert zurückgibt, gehen Sie davon aus, dass der Wert abgelaufen ist, // In diesem Moment legt ein anderer Knoten den Wert fest, und das Schloss wird von einem anderen Thread (der Knoten gilt) und das nächste Urteil // kann diese Situation nicht erkennen. Dieses Problem sollte jedoch keine anderen Probleme verursachen, da der Zweck dieser Methode // keine synchrone Kontrolle ist, sondern nur ein Bericht über den Sperrstatus. return! isTimeExpired (Wert); }} @Override Protected void Unlock0 {// todo bestimmt, ob die Sperre String value = jedis.get (lockkey) ausgibt; if (! isTimeExpired (Wert)) {dounlock; }} private void CheckInterrupion löscht InterruptedException {if (thread.currentThread.Interrupted) {neue InterruptedException; }} private boolean iStimeExpired (String -Wert) {return long.parselong (value) <System.currentTimemillis; } private boolean isTimeout (lange Start, langfristig) {return start + timeout> system.currentTimemillis; } private void dounlock {jedis.del (lockkey); }} Wenn Sie die Implementierungsmethode in Zukunft (z. B. zookeeper usw.) ändern, können Sie AbstractLock direkt erben und lck ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) und unlock0 Methode (sogenannte Abstraktion) implementieren.
prüfen
Simulieren Sie den globalen ID -Züchter und entwerfen Sie eine IDGenerator -Klasse. Diese Klasse ist für die Generierung globaler inkrementeller IDs verantwortlich. Sein Code ist wie folgt:
Paket cc.lixiaohui.lock; import Java.math.biginTier; Import Java.util.Concurrent Private Final Lock Lock; private statische Final BigInenger Increment = BigInteger.Valueof (1); public idgenerator (lock lock) {this.lock = lock; } public String getAndIncrement {if (lock.tryLock (3, timeUnit.seconds)) {try {// todo den Sperre hier erhalten und auf die ressourcenrendige ressourcenrückgegenden getandIncrement0 zugreifen. } endlich {lock.unlock; }} return null; // return getandincrement0; } private String getAndIncrement0 {String s = id.toString; id = id.add (Inkrement); Rückkehr s; }} Test Hauptlogik: Zwei Themen werden in einer toten Schleife in derselben JVM geöffnet (es gibt kein Intervall zwischen den Schleifen. Wenn es vorhanden ist, ist der Test bedeutungslos), um ID zu erhalten (ich bin keine tote Schleife, sondern für 20er Jahre), holen Sie sich die ID und speichern Sie sie im selben Set . Überprüfen Sie, ob die ID im set vorhanden ist. Wenn es bereits existiert, lassen Sie beide Threads anhalten. Wenn das Programm normalerweise 20 Sekunden laufen kann, bedeutet dies, dass diese verteilte Schloss die Anforderungen erfüllen kann. Die Auswirkung eines solchen Tests sollte der gleiche sein wie der von verschiedenen JVMs (dh in einer realen verteilten Umgebung). Das Folgende ist der Code der Testklasse:
Paket cc.lixiaohui.distributedlock.distributedlock; Import Java.util.hashset; Import Java.util.set; import org.junit.test; Imorye.clients.jedis.jedis; Import cc.loxiaohui.lock.Lock.Lock.Lock.Lock.Lock.Lock.Lock.Lock.Lock.Lock.Denerator; cc.lixiaohui.lock.redisBasedDistributedlock; public class idgeneratestest {private static set <string> generatedIds = new Hashset <string>; private statische endgültige String lock_key = "lock.lock"; private statische endgültige long lock_expire = 5 * 1000; @Test public void test wirft unterruptedException {jedis jedis1 = new Jedis ("localhost", 6379); Lock lock1 = new recispaseddistributedlock (jedis1, lock_key, lock_expire); IDGenerator g1 = neuer IDGenerator (Lock1); Idconsumemission conseum1 = new IdConsumemission (G1, "conneum1"); Jedis Jedis2 = New Jedis ("Localhost", 6379); Lock lock2 = new recispaseddistributedlock (jedis2, lock_key, lock_expire); IDGenerator g2 = neuer IDGenerator (Lock2); Idconsumemission conseum2 = new IdConsumemission (G2, "conseum2"); Thread T1 = neuer Thread (Consume1); Thread T2 = neuer Thread (Consume2); t1.start; t2.Start; Thread.sleep (20 * 1000); // zwei Threads 20 Sekunden lang idconsumemissions laufen lassen. T1.Join; t2.join; } statische String -Zeit {return string.ValueOf (System.currentTimemillis / 1000); } statische Klassen -IDConsumemission implementiert runnable {private idGenerator idGenerator; privater Zeichenfolge Name; privater statischer, flüchtiger boolescher Stopp; public idConsumemission (IDGenerator idGenerator, String -Name) {this.idGenerator = idGenerator; this.name = name; } public static void stop {stop = true; } public void run {System.out.println (Zeit + ": Consume" + Name + "start"); while (! stop) {string id = idGenerator.getandIncrement; if (generatedids.contains (id)) {System.out.println (Zeit + ": Duplicate ID generiert, id =" + id); stop = true; weitermachen; } generatedIds.add (id); System.out.println (Zeit + ": konsumieren" + name + "add id =" + id); } System.out.println (Zeit + ": konsumieren" + name + "done"); }}}Um klar zu sein, ist die Art und Weise, wie ich hier zwei Threads stoppt, nicht sehr gut. Ich habe das aus Bequemlichkeit gemacht, weil es nur ein Test ist, also ist es am besten, dies nicht zu tun.
Testergebnisse
Es gibt zu viele Dinge in 20ern. Die vorne clear und werden nur dann verfügbar, wenn der Lauf fast fertig ist. Der Screenshot unten. Dies zeigt, dass dieses Schloss normal funktioniert:
Wenn IDGererator nicht gesperrt ist (dh die getAndIncrement -Methode des IDGererator sperrt sie nicht, wenn sie id intern erhält), wird der Test nicht bestehen, und es besteht eine sehr hohe Wahrscheinlichkeit, dass er auf halbem Weg anhält. Im Folgenden sind die Testergebnisse aufgeführt, wenn das Schloss nicht gesperrt ist:
Dies dauert weniger als 1 Sekunde:
Dieser dauert weniger als 1 Sekunde:
Abschluss
OK, es geht um die oben genannte Umsetzung von Java, die verteilte Sperren basierend auf Redis implementieren. Wenn Sie Probleme finden, hoffen Sie, sie zu korrigieren. Ich hoffe, dieser Artikel kann Ihnen helfen, zu studieren und zu arbeiten. Wenn Sie Fragen haben, können Sie eine Nachricht zur Kommunikation überlassen.