Prefacio
La sincronización dentro de un solo JVM es fácil de manejar. Simplemente use el bloqueo proporcionado por JDK directamente, pero la sincronización de procesos transversales es definitivamente imposible. En este caso, debe confiar en un tercero. Utilizo Redis aquí y, por supuesto, hay muchos otros métodos de implementación. De hecho, el principio basado en la implementación de Redis es bastante simple. Antes de leer el código, se recomienda que verifique primero el principio. Después de leer el código, debería ser más fácil de entender.
No implemento la interfaz java.util.concurrent.locks.Lock de JDK, pero personalizo uno, porque hay un método newCondition en JDK. No lo he implementado por el momento. Este bloqueo proporciona 5 variantes de métodos de bloqueo. Puede elegir cuál usar para obtener el bloqueo. Mi idea es que es mejor usar los métodos con devolución de tiempo de espera. Porque si este no es el caso, si Redis está colgado, el hilo siempre estará en el bucle muerto (sobre esto, debe optimizarse aún más. Si Redis está colgado, la operación de JEDIS definitivamente arrojará excepciones, etc.
paquete cc.lixiaohui.lock; import java.util.concurrent.timeunit; bloqueo de interfaz pública { / *** Bloqueo de adquisición de bloqueo, no respondiendo a la interrupción* / bloqueo void; / ** * Bloqueo de bloqueo de adquisición, no respondiendo a la interrupción * * @throws InterruptedException */ Void LockInterruptuiony lanza interruptedException; / *** Intente adquirir el bloqueo, regrese inmediatamente sin obtenerlo, no bloquear*/ boolean Trylock; / ** * El bloqueo de adquisición de bloqueo devuelto automáticamente por Tiempo de espera, no respondiendo a la interrupción * * @param Time * @param Unit * @return {@code true} Si el bloqueo se adquiere con éxito, {@code falso} Si el bloqueo no se recupera dentro del tiempo especificado * */ boolean Trylock (largo tiempo de tiempo, unidad de tiempo de tiempo); / *** El bloqueo de adquisición de bloqueo devuelto automáticamente por tiempo de espera, respuesta de respuesta** @param time* @param unit* @return {@code true} Si el bloqueo se adquiere correctamente, {@code false} Si el bloqueo no se retrocede dentro de la hora especificada* @throws interrumpedexception el subproceso actual que intenta el bloque InterruptedException; / *** Lanzamiento de liberación*/ Void desbloqueo; }Mire su implementación abstracta:
paquete cc.lixiaohui.lock; import java.util.concurrent.timeunit;/*** La implementación del esqueleto del bloqueo, los pasos reales para adquirir el bloqueo se implementan mediante subclases. * * @author lixiaohui * */public abstract class AbstractLock implements Lock { /** * <pre> * Whether visibility needs to be guaranteed here is worth discussing, because it is a distributed lock, * 1. It is also possible for multiple threads of the same jvm to use different lock objects, and in this case there is no need to guarantee visibility* 2. Multiple threads of the same jvm to use the same lock object, then visibility must be garantizado. * </pre> */ protegido volátil booleano bloqueado; / ** * El hilo actualmente contiene el bloqueo en jvm (si tiene uno) */ hilo privado exclusiveSownThread; Public void Lock {try {bloqueo (falso, 0, nulo, falso); } capt (interruptedException e) {// toDo ignore}} public void LockInterruptUmenty lanza InterruptedException {Lock (False, 0, NULL, True); } public boolean TryLock (Long Time, TimeUnit Unit) {try {return Lock (verdadero, tiempo, unidad, falso); } catch (interruptedException e) {// tODO ignore} return false; } public boolean trylockInterruptible (Long Time, TimeUnit Unit) lanza interruptedException {Lock de retorno (verdadero, tiempo, unidad, verdadero); } Public void desbloqueo {// TODO Compruebe si el hilo actual contiene el bloqueo if (thread.currentThread! = getExCluseSownerThread) {lanzar nueva ilegalMonitorStateException ("El hilo actual no contiene el bloqueo"); } desbloquear0; setExClusiveSownThread (NULL); } protegido void setExClusiveOnderThread (hilo de hilo) {exclusiveSownThread = thread; } El hilo final protegido getExClusiveOnderThread {return ExcusionOwnerThread; } Programa abstracto protegido desbloqueo0; / ** * Implementación del bloqueo de adquisición de bloqueo * * @param useTimeout * @param time * @param unit * @param interrumpir si debe responder a las interrupciones * @return * @throws InterruptedException */ protegido abstracto Boolean Lock (Boolean useTimeout, tiempo largo, tiempo de tiempo de tiempo booleano) lanzas interrupciones de interrupción;}} Basado en la implementación final de Redis, el código clave para adquirir y liberar el bloqueo está en el método lock y unlock0 el método de esta clase. Solo puedes mirar estos dos métodos y escribir uno completamente solo:
paquete cc.lixiaohui.lock; import java.util.concurrent.timeunit; import roedis.clients.jedis.jedis;/** * <pre> * bloqueo distribuido implementado por la operación setnx basada en redis * * Es mejor usar bloque href = "http://redis.io/commands/setnx"> referencia de operación setnc </a> * </pre> * * @author lixiaohui * */public class redisBasedDistributedLock extiende abstractlock {private jedis jedis; // El nombre de la cadena protegida de bloqueo LockKey; // La duración de la validez de la cerradura (MS) protegió los lockexpires largos; public RedisBasedDistributedlock (Jedis Jedis, String LockKey, Long LockExpires) {this.jedis = jedis; this.lockkey = LockKey; this.lockExpires = LocKExpires; } // Implementación del bloqueo de bloqueo de adquisición de bloqueo protegido de bloqueo booleano (boolean useTimeout, mucho tiempo, unidad de tiempo de tiempo, interrupción booleana) arroja interruptedException {if (interrupt) {checkinterruption; } Long Start = System.CurrentTimemillis; tiempo de espera largo = unit.tomillis (tiempo); // if! useTimeout, entonces es inútil mientras (useTimeOut? istimeout (inicio, timeout): true) {if (interrupt) {checkInterruption; } Long LocKExpireTime = System.CurrentTimemillis + LocKExpires + 1; // Lock TIMPLEOUT String StringOfLocKExpireTime = String.ValueOf (LocKExpiretime); if (jedis.setnx (LockKey, StringOfLockExpiretime) == 1) {// obtenido bloqueo // toDo obtuvo con éxito el bloqueo, establece el identificador relevante bloqueado = true; setExClusiveOnderThread (Thread.CurrentThread); devolver verdadero; } Valor de cadena = jedis.get (LockKey); if (value! = null && istimeExpired (valor)) {// bloquear está caducado // asume que múltiples subprocesos (no solares jvm) vienen aquí al mismo tiempo cadena OldValue = jedis.getSet (bloqueo de lockkey, stringOfLockExpiretime); // GetSet es atómico // pero el Value OldValue obtenido por cada hilo cuando se trata aquí es definitivamente imposible ser el mismo (porque GetSet es atómico) // El Value OldValue obtenido por unión todavía está expirado, entonces significa que el bloqueo se obtiene si (OldValue! = NULL && IStimeexPied (OldValue)) {// ToDo SuccessTy el bloqueo, el bloqueo relevante; setExClusiveOnderThread (Thread.CurrentThread); devolver verdadero; }} else {// TODO LOCK NO ESPIRA, ENTRAR NEXT BOCO RETRINIO}} return false; } public boolean TryLock {long LocKExpireTime = System.CurrentTimemillis + LocKExpires + 1; // Lock TimeOut Time String StringOfLocKExpireTime = String.ValueOf (LocKExpireTime); if (jedis.setnx (LockKey, StringOfLockExpireTime) == 1) {// Obtenga el bloqueo // TODO adquiere con éxito el bloqueo, establece el identificador relevante bloqueado = true; setExClusiveOnderThread (Thread.CurrentThread); devolver verdadero; } Valor de cadena = jedis.get (LockKey); if (value! = null && istimeExpired (valor)) {// el bloqueo está caducado // asumir múltiples subprocesos (no solo jvm) Ven aquí al mismo tiempo cadena OldValue = jedis.getSet (LockKey, StringOfLocKExpireTime); // GetSet es atómico // pero el Valor OldValue obtenido por cada hilo cuando se trata aquí es definitivamente imposible (porque GetSet es atómico) // Si el Valor OldValue todavía está expirado, entonces significa que ha obtenido el bloqueo si (OldValue! = NULL && IStimeExpired (OldValue)) {// a lo que se obtiene con éxito obtenido el bloqueo, bloquee el identificador relevante relevante; verdadero verdadero; setExClusiveOnderThread (Thread.CurrentThread); devolver verdadero; }} else {// TODO LOCK NO ESPIRA, ENTRAR SIGUIENTE RETIRO DE COMO} return false; } /*** Consultas si este bloqueo está sostenido por cualquier hilo. * * @return {@code true} Si algún hilo contiene este bloqueo y * {@code false} de lo contrario */ public boolean islocked {if (bloqueado) {return true; } else {string value = jedis.get (LockKey); // TODO Hay en realidad un problema aquí. Piense: cuando el método GET devuelva el valor, suponga que el valor ha expirado, // En este momento, otro nodo establece el valor y el bloqueo está en manos de otro hilo (el nodo se mantiene), y el siguiente juicio // no puede detectar esta situación. Sin embargo, este problema no debe causar otros problemas, porque el propósito de este método es // no es el control sincrónico, es solo un informe del estado de bloqueo. Return! IStimeExpired (valor); }} @Override protegido void desbloock0 {// tODO determina si el bloqueo expira el valor de la cadena = jedis.get (LockKey); if (! IStimeExPired (valor)) {doUnlock; }} private void checkinterruption lanza interruptedException {if (thread.currentThread.isinterrupted) {tirar nueva interrupciónxception; }} private boolean istrimeXpired (valor de cadena) {return long.parselong (valor) <system.currentTimemillis; } private boolean istimeout (inicio largo, tiempo de espera largo) {return start + timeOut> system.currentTimemillis; } Dounlock vacío privado {jedis.del (LockKey); }} Si cambia el método de implementación en el futuro (como zookeeper , etc.), puede heredar directamente AbstractLock e implementar ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) y unlock0 Method (llamada abstracción)
prueba
Simule el productor de identificación global y diseñe una clase IDGenerator . Esta clase es responsable de generar ID de incrementales globales. Su código es el siguiente:
paquete cc.lixiaohui.lock; import java.math.biginteger; import java.util.concurrent.timeunit;/** * Simular la generación de identificación * @author lixiaohui * */public class idgenerator {private estático biginteger id = biginteger.valueof (0); bloqueo final privado de bloqueo; Incremento de BigInteger final estático privado = BigInteger.ValueOf (1); Public IdGenerator (bloqueo de bloqueo) {this.lock = bloqueo; } public String getAndIrcrement {if (Lock.trylock (3, TimeUnit.Seconds)) {try {// para obtener el bloqueo aquí y acceder al retorno de recursos de área crítica getAndincrement0; } Finalmente {Lock.unlock; }} return null; // regresa getandincrement0; } cadena privada getandincrement0 {String s = id.ToString; id = id.add (incremento); regreso s; }} Lógica principal de prueba: se abren dos hilos en el mismo JVM en un bucle muerto (no hay un intervalo entre los bucles, si lo existe, la prueba no tendrá sentido) para obtener ID (no soy un bucle muerto sino ejecutar por 20), obtener la ID y almacenarlo en el mismo Set . Antes de almacenarse, verifique si la ID existe en set . Si ya existe, deje que ambos hilos se detengan. Si el programa puede ejecutar 20 segundos normalmente, significa que este bloqueo distribuido puede cumplir con los requisitos. El efecto de dicha prueba debe ser el mismo que el de diferentes JVM (es decir, en un entorno distribuido real). El siguiente es el código de la clase de prueba:
paquete cc.lixiaohui.distributedlock.distributedlock; import java.util.hashset; import java.util.set; import org.junit.test; import redis.clients.jedis.jedis; import cc.lixiaohui.lock.idenerator; import cc.lixiaohui.lock; import cc.lixiaohui.lock.redisBasedDistributedLock; public class IDGeneratortest {private static set <String> generatedIds = new Hashset <String>; static final static privado list_key = "Lock.lock"; PRIVADO ESTÁTICO FINAL LARGO LARGO_EXPIRE = 5 * 1000; @Test public void Test lanza interruptedException {JEDIS JEDIS1 = new JEdis ("LocalHost", 6379); LOCK LOCK1 = new RedisBasedDistributedLock (JEDIS1, LOCK_KEY, LOCK_EXPIRE); IdGenerator G1 = New IdGenerator (Lock1); Idconsumemission consumen1 = new Idconsumemission (g1, "consumen1"); JEDIS JEDIS2 = new Jedis ("Localhost", 6379); LOCK LOCK2 = new RedisBasedDistributedLock (JEDIS2, LOCK_KEY, LOCK_EXPIRE); IdGenerator G2 = new IdGenerator (Lock2); Idconsumemission consumen2 = new Idconsumemission (G2, "consumen2"); Hilo t1 = nuevo hilo (consumo1); Hilo t2 = nuevo hilo (consumo2); t1.star; t2.star; Thread.sleep (20 * 1000); // deja que dos hilos funcionen durante 20 segundos idconsumemission.stop; T1.JOIN; t2.join; } tiempo de cadena estática {return String.ValueOf (System.CurrentTimemillis / 1000); } La clase estática IDConsumemission implementa Runnable {private idgenerator idgenerator; nombre de cadena privada; parada booleana volátil estática privada; public IDConsumEmission (IdGenerator IDGenerator, Nombre de cadena) {this.idGenerator = IdGenerator; this.name = name; } Public static void stop {stop = true; } public void run {System.out.println (Time + ": Consume" + Nombre + "Inicio"); while (! stop) {string id = idgenerator.getAndIncrement; if (generatedIds.contains (id)) {System.out.println (Time + ": duplicate ID generado, id =" + id); parar = verdadero; continuar; } generatedIds.Add (id); System.out.println (Time + ": Consume" + Name + "Agregar id =" + id); } System.out.println (tiempo + ": consumo" + nombre + "hecho"); }}}Para ser claros, la forma en que detengo dos hilos aquí no es muy bueno. Hice esto por conveniencia, porque es solo una prueba, por lo que es mejor no hacer esto.
Resultados de las pruebas
Hay demasiadas cosas impresas en los años 20. Los impresos en la parte delantera están clear y solo están disponibles cuando la ejecución está casi terminada. La captura de pantalla a continuación. Esto muestra que este bloqueo funciona normalmente:
Cuando IDGererator no está bloqueado (es decir, el método getAndIncrement de IDGererator no lo bloquea cuando obtiene id internamente), la prueba no pasará y hay una probabilidad muy alta de que se detenga a mitad de camino. Los siguientes son los resultados de la prueba cuando el bloqueo no está bloqueado:
Esto toma menos de 1 segundo:
Este toma menos de 1 segundo:
Conclusión
Ok, lo anterior tiene que ver con Java implementando bloqueos distribuidos basados en Redis. Si encuentra algún problema, espera corregirlos. Espero que este artículo pueda ayudarlo a estudiar y trabajar. Si tiene alguna pregunta, puede dejar un mensaje para comunicarse.