Prefácio
A sincronização em uma única JVM é fácil de lidar. Basta usar o bloqueio fornecido diretamente pelo JDK, mas a sincronização de processos cruzados é definitivamente impossível. Nesse caso, você deve confiar em terceiros. Eu uso o Redis aqui e, é claro, existem muitos outros métodos de implementação. De fato, o princípio baseado na implementação do Redis é bastante simples. Antes de ler o código, é recomendável que você verifique primeiro o princípio. Depois de ler o código, deve ser mais fácil de entender.
Não implemento o java.util.concurrent.locks.Lock da JDK, mas personalizo um, porque existe um método newCondition no JDK. Não o implementei por enquanto. Este bloqueio fornece 5 variantes de métodos de bloqueio. Você pode escolher qual deles usar para obter o bloqueio. Minha ideia é que é melhor usar os métodos com o retorno do tempo limite. Porque se não for esse o caso, se Redis for pendurado, o fio sempre estará no loop morto (sobre isso, deve ser otimizado mais. Se Redis for pendurado, a operação de Jedis definitivamente lançará exceções e assim
pacote cc.lixiaohui.lock; importar java.util.concurrent.timeunit; bloqueio de interface pública { / *** bloqueando o bloqueio de aquisição, não respondendo à interrupção* / void Lock; / ** * bloqueando o bloqueio de aquisição, não respondendo à interrupção * * @Throws InterruptEdException */ void Lock interruptivelmente lança interruptedException; / *** Tente adquirir o bloqueio, retorne imediatamente sem obtê -lo, sem bloquear*/ boolean Trylock; / ** * O bloqueio de bloqueio retornou automaticamente por tempo limite, não respondendo à interrupção * * @param time * @param unidade * @return {@code true} Se o bloqueio for adquirido com sucesso, {@code false} se o bloqueio não for recuperado dentro do tempo especificado * */ boolean Trylock (long -time time, timeUnit); / *** O bloqueio de bloqueio retorno automaticamente retornado por tempo limite, interrupção de resposta** @param horário* @param unidade* @return {@code true} Se o bloqueio for adquirido com sucesso, {@code false} se o bloqueio não for recuperado dentro do tempo especificado* @throws interruptException A linha de alcance é interrompida* InterrompedException; / *** Libere o bloqueio*/ void de desbloqueio; }Veja sua implementação abstrata:
pacote cc.lixiaohui.lock; importar java.util.concurrent.timeUnit;/*** A implementação do esqueleto do bloqueio, as etapas reais para adquirir o bloqueio são implementadas por subclasses. * * @Author LXIAOHUI * * /Public Resumo Class AbstractLock implementa Lock { /** * <pre> * Se a visibilidade precisa ser garantida aqui, vale a pena discutir, porque é uma falha distribuída, * 1. Também é possível que os mesmos fios da mesma JVM usem diferentes objetos de trava e, nesse caso. estar garantido. * </pre> */ protegido booleano volátil protegido bloqueado; / ** * O tópico atualmente segurando o bloqueio na JVM (se tiver um) */ thread privado exclusivewnerthread; public void Lock {tente {Lock (false, 0, nulo, false); } catch (interruptedException e) {// TODO ignorar}} public void Lock interruptivelmente lança interruptedException {Lock (false, 0, nulo, true); } public boolean trylock (longo tempo, unidade de unidade de tempo) {tente {return bloqueio (true, tempo, unidade, false); } catch (interruptedException e) {// TODO ignorar} retornar false; } public boolean TrylockInterruptível (muito tempo, unidade de unidade de tempo) lança interruptedException {return bloqueio (true, tempo, unidade, true); } public void desbloquear {// TODO Verifique se o thread atual segura o bloqueio if (thread.currentThread! = getExclusivewnerthread) {lança new IllegalMonitorStateException ("Thread atual não segura o bloqueio"); } desbloqueio0; setExclusivewnerthread (nulo); } void protegido setExclusivewnerthread (thread thread) {exclusivewnerthread = thread; } thread final protegido getExclusivewnerthread {return exclusivewnerthread; } abstrato protegido void desbloqueio0; / ** * Implementação do bloqueio de bloqueio de aquisição * * @param usetimeout * @param time * @param unidade * @param interrompe se deve responder às interrupções * @return * @throws interruptedException */ protegido abstrato bloqueio booleano (interrotações booleanas, unidade de tempo longa, unidade de tempo, interrupção booleana). Com base na implementação final do REDIS, o código -chave para adquirir e liberar o bloqueio está no método lock e unlock0 do método desta classe. Você só pode olhar para esses dois métodos e escrever um completamente sozinho:
package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis;/** * <pre> * Distributed lock implemented by SETNX operation based on Redis* * It is best to use lock(long time, TimeUnit unit) when acquiring locks to avoid network problems causing threads to block all the time* * <a href = "http://redis.io/commands/setnx"> SetNC Operação Referência </a> * </pre> * * @author lixiaohui * */classe pública RedisBasedDististedLock estende abstrato {private jedis jedis; // o nome do bloqueio de trava protegido LockKey; // a duração da validade do bloqueio (MS) protegiu lockexpiros longos; public RedisBasedDistributedLock (Jedis Jedis, String LockKey, Long Lockexpires) {this.jedis = Jedis; this.lockKey = LockKey; this.lockexpires = lockexpires; } // Implementação do bloqueio de bloqueio de bloqueio de bloqueio Boolean Lock (Boolean Usetimeout, Long Time, TimeUnit, interrupção booleana) lança interruptedException {if (interrupt) {checkInterRrupção; } long start = system.currenttimemillis; longa timeout = unit.tomillis (tempo); // if! Usetimeout, então é inútil enquanto (usetimeout? isTimeout (start, timeout): true) {if (interrupt) {checkInterrupção; } long lockexpireTime = System.currenttimemillis + lockexpires + 1; // bloqueio de tempo limite string stringoflockexpireTime = string.valueof (lockexpireTime); if (jedis.setNx (LockKey, stringoflockexpireTime) == 1) {// obteve bloqueio // TODO obteve o bloqueio com sucesso, defina o identificador relevante bloqueado = true; setExclusivewnerthread (Thread.currentThread); retornar true; } String value = jedis.get (LockKey); if (value! = null && isTimeExpired (value)) {// bloqueio expire // assuma que vários threads (JVM não single) vêm aqui ao mesmo tempo string antiga antiga = jedis.getSet (LockKey, stringoflockexpireTime); // getSet é atômico //, mas o antigo valor obtido por cada fio quando se vem aqui é definitivamente impossível de ser o mesmo (porque o getSet é atômico) // O antigo valor obtido pela união ainda expirou, então significa que o bloqueio é obtido se obtido (idosa! setExclusivewnerthread (Thread.currentThread); retornar true; }} else {// TODO Lock não expirou, digite o próximo loop novamente a tentativa}} retornar false; } public boolean Trylock {Long LockexpireTime = System.CurrentTimemillis + Lockexpires + 1; // Time limite de bloqueio String stringoflockexpireTime = string.valueof (lockexpiretime); if (jedis.setnx (LockKey, stringoflockexpireTime) == 1) {// obtenha o bloqueio // adquira com sucesso o bloqueio, defina o identificador relevante bloqueado = true; setExclusivewnerthread (Thread.currentThread); retornar true; } String value = jedis.get (LockKey); if (value! = null && isTimeExpired (value)) {// bloqueio expire // assume vários threads (não apenas JVM) vêm aqui ao mesmo tempo string antiga antiga = jedis.getSet (LockKey, stringoflockexpireTime); // getSet é atômico //, mas o antigo valor obtido por cada thread quando vem aqui é definitivamente impossível (porque o getSet é atômico) // Se o antigo valor que você obtém ainda estiver expirado, isso significa que você tem o bloqueio se (antigo! setExclusivewnerthread (Thread.currentThread); retornar true; }} else {// TODO Lock não expirou, digite o próximo loop novamente a tentativa} retornar false; } /*** Consultas se esse bloqueio for mantido por qualquer thread. * * @return {@code true} Se qualquer thread mantém esse bloqueio e * {@code false} caso contrário */ public boolean islocked {if (bloqueado) {return true; } else {string value = jedis.get (LockKey); // TODO há realmente um problema aqui. Pense: quando o método get retornar o valor, suponha que o valor expirou, // neste momento, outro nó define o valor e o bloqueio é mantido por outro thread (o nó segura) e o próximo julgamento // não pode detectar essa situação. No entanto, esse problema não deve causar outros problemas, porque o objetivo desse método é // não é controle síncrono, é apenas um relatório do status de bloqueio. return! isTimeExpired (valor); }} @Override Protected void Unlock0 {// TODO determina se o bloqueio expira o valor da string = jedis.get (LockKey); if (! isTimeExirird (valor)) {Dounlock; }} Private void checkInterrupção lança interruptEdException {if (thread.currentThread.isinterrupted) {lança new interruptEdException; }} Private boolean isTimeExpired (String value) {return Long.Parselong (Value) <System.CurrentTimemillis; } Private Boolean ISTimeout (Start Long, Long Timeout) {return start + timeout> system.currenttimemillis; } private void Dounlock {Jedis.del (LockKey); }} Se você alterar o método de implementação no futuro (como zookeeper , etc.), poderá herdar diretamente AbstractLock e implementar o ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) e o Método unlock0 (SOBRETILLED ABSTRACÇÃO)
teste
Simule o produtor de ID global e projete uma classe IDGenerator . Esta classe é responsável por gerar IDs incrementais globais. Seu código é o seguinte:
pacote cc.lixiaohui.lock; importar java.math.biginteger; importar java.util.concurrent.timeunit;/** * simular a geração de identificação * @author lixiaohui * */public class idGenerator {private static biginteger id = bigInteger.valueof bloqueio final privado de bloqueio; Biginteger final estático privado incremento = biginteger.valueof (1); public IDGenerator (bloqueio de bloqueio) {this.lock = bloqueio; } public String getAndIncrement {if (Lock.TryLock (3, TimeUnit.Seconds)) {try {// TODO Obtenha o bloqueio aqui e acesse o recurso de área crítica retornar getAndIncrement0; } finalmente {Lock.unlock; }} retornar nulo; // retorna getAndIncrement0; } private string getAndincrement0 {string s = id.toString; id = id.add (incremento); retorno s; }} Teste a lógica principal: dois threads são abertos na mesma JVM em um loop morto (não há intervalo entre os loops, se houver, o teste não terá sentido) para obter ID (eu não sou um loop morto, mas corra por 20 anos), obtenha o ID e guarde -o no mesmo Set . Antes de ser armazenado, verifique se o ID existe no set . Se já existir, deixe os dois threads pararem. Se o programa puder executar 20 segundos normalmente, significa que esse bloqueio distribuído pode atender aos requisitos. O efeito de tal teste deve ser o mesmo que o de diferentes JVMs (ou seja, em um ambiente real distribuído). A seguir, o código da classe de teste:
pacote cc.lixiaohui.distributedlock.distributedlock; importar java.util.hashset; importar java.util.set; importar org.junit.test; import redis.cliients.jedis.jedis; importação c.lixiaohui.lock.idGenerlator; ImportGenerl; cc.lixiaohui.lock.redisBasedDistributedLock; public Classe IDGeneratorTest {private estático conjunto <String> generatedIds = new HashSet <String>; String final estática privada Lock_key = "Lock.lock"; Private estático final Long Lock_expire = 5 * 1000; @Test Public void Test Throws InterruptException {Jedis Jedis1 = New Jedis ("Localhost", 6379); Lock Lock1 = new RedisBasedDistributedLock (JEDIS1, Lock_Key, Lock_Expire); IDGenerator G1 = novo IDGenerator (Lock1); Idconsumemission consume1 = nova idconsumemission (g1, "consume1"); Jedis Jedis2 = New Jedis ("Localhost", 6379); Lock Lock2 = new RedisBasedDistributedLock (JEDIS2, Lock_Key, Lock_Expire); IdGenerator g2 = novo idGenerator (Lock2); Idconsumemission consume2 = nova idconsumemissão (g2, "consume2"); Thread t1 = novo thread (consume1); Tópico T2 = novo thread (consume2); t1.start; t2.start; Thread.sleep (20 * 1000); // Deixe dois threads executarem por 20 segundos IDCOnsumemission.stop; T1.Join; T2.Join; } tempo de string estático {return string.valueof (system.currenttimemillis / 1000); } classe estática IDCOnsumemission implementa Runnable {private IDGenerator Idgenerator; nome de string privado; parada booleana volátil estática privada; public IdConsumemission (IDGenerator IdGenerator, Nome da String) {this.idgenerator = IDGenerator; this.name = nome; } public static void pare {stop = true; } public void run {System.out.println (time + ": consume" + nome + "start"); while (! Stop) {string id = idGenerator.getAndIncrement; if (generatedIds.contains (id)) {System.out.println (time + ": ID duplicado gerado, id =" + id); stop = true; continuar; } generatedIds.add (id); System.out.println (tempo + ": consume" + nome + "adicione id =" + id); } System.out.println (time + ": consume" + nome + "feito"); }}}Para ficar claro, a maneira como eu paro dois tópicos aqui não é muito boa. Eu fiz isso por conveniência, porque é apenas um teste, por isso é melhor não fazer isso.
Resultados do teste
Há muitas coisas impressas nos 20 anos. Os impressos na frente estão clear e disponíveis apenas quando a corrida está quase terminada. A captura de tela abaixo. Isso mostra que esse bloqueio funciona normalmente:
Quando IDGererator não está bloqueado (ou seja, o método getAndIncrement do IDGererator não o trava quando obtém id internamente), o teste não passará e há uma probabilidade muito alta de que ele pare no meio do caminho. A seguir, são apresentados os resultados do teste quando o bloqueio não está bloqueado:
Isso leva menos de 1 segundo:
Este leva menos de 1 segundo:
Conclusão
Ok, o acima é sobre o Java implementar bloqueios distribuídos com base no Redis. Se você encontrar algum problema, espera corrigi -los. Espero que este artigo possa ajudá -lo a estudar e trabalhar. Se você tiver alguma dúvida, pode deixar uma mensagem para se comunicar.