序文
単一のJVM内の同期は簡単に対処できます。 JDKが提供するロックを直接使用するだけですが、クロスプロセスの同期は間違いなく不可能です。この場合、サードパーティに頼る必要があります。ここでRedisを使用していますが、もちろん他に多くの実装方法があります。実際、Redisの実装に基づく原則は非常に簡単です。コードを読む前に、最初に原則を確認することをお勧めします。コードを読んだ後、理解しやすいはずです。
JDKのjava.util.concurrent.locks.Lockインターフェイスは実装していませんが、JDKにnewCondition方法があるため、カスタマイズします。当面は実装していません。このロックは、5つのロックメソッドのバリエーションを提供します。ロックを取得するために使用するものを選択できます。私の考えは、タイムアウトリターンでメソッドを使用することが最善であるということです。これが当てはまらない場合、Redisがハングしている場合、スレッドは常に死んだループになります(これについては、これについてさらに最適化する必要があります。Redisがハングする場合、Jedisの操作は間違いなく例外を投げます。
パッケージcc.lixiaohui.lock; Import java.util.concurrent.timeunit; public interface lock { / ***ブロッキング取得ロック、割り込み* / voidロックに応答しません。 / ** * cotusitionロックのブロック、割り込みに応答しません * * @throws furtrededexception */ void lockinterminteribly interurtedexception; / ***ロックを取得してみて、すぐに取得せずに戻ります。 / ** *ブロッキングの取得ロックはタイムアウトによって自動的に返され、割り込みに応答しません * @param unit * @return {@code true}ロックが正常に取得された場合、{@code false}指定された時間内にロックが取得されない場合 * * */ boolean tryLock(longunit unit); / ***タイムアウトによって自動的に返されたブロッキング取得ロック、応答割り込み** @param time* @param unit* @return {@code true}ロックが正常に取得された場合、{@code false}指定された時間内にロックが検索されない場合* @throws untersedexception untexted the lock intruringeded*/ booleanlockindexedexedexcection中断exception; / ***ロックをリリース*/ void nonlock; }その抽象的な実装を見てください:
パッケージcc.lixiaohui.lock; Import java.util.concurrent.timeunit;/***ロックのスケルトン実装では、ロックを取得するための実際のステップはサブクラスによって実装されます。 * * @author lixiaohui * * /public abstract class abststractlockはlock { /** * <pre> *ここで視界を保証する必要があるかどうかは議論する価値があります。それは分散ロックです。 * 1。同じJVMの複数のスレッドが異なるロックオブジェクトを使用することも可能です。視界を保証する必要があります。 * </ pre> */保護された揮発性ブール波ロック。 / ** *現在JVMにロックを保持しているスレッド(1つがある場合) public void lock {try {lock(false、0、null、false); } catch(arturtedexception e){// todo adignore}} public void lock intructibly throws interteredexception {lock(false、0、null、true); } public boolean tryLock(Long Time、TimeUnit Unit){try {return lock(true、time、unit、false); } catch(arturtedexception e){// todo無視} return false; } public boolean tryLockInterrumdibly(long time、timeUnit unit)throws arturnedexception {return lock(true、time、unit、true); } public void nonlock {// TODO現在のスレッドがロックを保持するかどうかを確認します(thread.currentThread!= getExclusiveOntherThread){新しいIllegalMonitorStateException( "現在のスレッドはロックを保持しない"); } Unlock0; SetExclusiveOwnerThread(null); } protected void setexclusiveOnthRead(スレッドスレッド){排他的OwnErthRead = thread; }保護された最終スレッドgetExclusiveOwnErthRead {return exclusiveOwnERTHREAD; }保護された抽象void lrock0; / ** *ブロッキング取得ロックの実装 * * @param usetimeout * @param time * @param unit * @param interurts unturts untrustsに応答するかどうか * @return * @throws中断boolean lock(boolean usetimeout、long -unit unit、boolean untruption)。 Redisの最終実装に基づいて、ロックを取得してリリースするための重要なコードは、このクラスのlockメソッドとunlock0 。これらの2つの方法しか見て、自分で完全に書くことができます。
パッケージCC.LIXIAOHUI.LOCK; IMPORT JAVA.UTIL.CONCURRENT.TIMEUNIT; IMPORT REDIS.CLIENTS.CLIENTS.JEDIS.JEDIS;/** * <PRE> * Redisに基づいてsetNX操作によって実装された分散ロック * *ロック(長い時間、TimeUnitユニット)を使用するのが最善です。 href = "http://redis.io/commands/setnx"> setnc操作リファレンス</a> * </pre> * * @author lixiaohui * */public class redisbaseddistributedlock extends abstractlock {private jedis jedis; //ロック保護された文字列ロックキーの名前。 //ロックの有効期間(MS)は長いlockexpiresを保護しました。 public redisbaseddistributedlock(jedis jedis、string lockkey、long lockexpires){this.jedis = jedis; this.lockkey = lockkey; this.lockexpires = lockexpires; } //ブロッキング取得ロックの実装保護されたブールロック(Boolean usetimeout、long time、timeunit unit、boolean interrupt)Sthrows arturtedexception {if(interrupt){checkinterruption; } long start = system.currenttimemillis; long timeout = unit.tomillis(time); // if!usetimeout、それは役に立たないwhile(usetimeout?istimeout(start、timeout):true){if(arturts){checkinterruption; } long lockexpiretime = system.currenttimemillis + lockexpires + 1; //ロックタイムアウト文字列stringoflockexpiretime = string.valueof(lockexpiretime); if(jedis.setnx(lockkey、stringoflockexpiretime)== 1){// doceaded lock // todoはロックを正常に取得し、関連する識別子をlocked = trueに設定します。 setExclusiveOwnErthRead(thread.CurrentThread); trueを返します。 } string value = jedis.get(lockkey); if(value!= null && istimeexpired(value)){//ロックは有効期限が切れている//複数のスレッド(非シングルjvm)が同時にここに来ると仮定します。 // getsetはAtomic //ですが、ここに来るときに各スレッドで取得された古い値は間違いなく同じであることが不可能です(getsetはアトミックであるため) setExclusiveOwnErthRead(thread.CurrentThread); trueを返します。 }} else {// todo lockの有効期限が切れない、次のループ再試行を入力}} falseを返します。 } public boolean tryLock {long lockexpiretime = system.currenttimemillis + lockexpires + 1; //ロックタイムアウトString stringoflockexpiretime = string.valueof(lockexpiretime); if(jedis.setnx(lockkey、stringoflockexpiretime)== 1){// lock // todoを取得してロックを正常に取得し、関連する識別子をlocked = trueに設定します。 setExclusiveOwnErthRead(thread.CurrentThread); trueを返します。 } string value = jedis.get(lockkey); if(value!= null && istimeexpired(value)){//ロックは有効期限が切れている//複数のスレッド(単一のjvmではない)と同時にここに来るsing oldvalue = jedis.getset(lockkey、stringoflockexpiretime); // getsetはAtomic //ですが、ここに来るときに各スレッドで取得された古い値は間違いなく不可能です(getsetはアトミックであるため)// setExclusiveOwnErthRead(thread.CurrentThread); trueを返します。 }} else {// todo lockの有効期限が切れない、次のループ再試行を入力する} return false; } /***このロックが任意のスレッドによって保持されている場合はクエリ。 * * @return {@code true}いずれかのスレッドがこのロックと * {@code false}を保持している場合 */ public boolean islocked {if(locked){return true; } else {string value = jedis.get(lockkey); // todoここには実際に問題があります。考えてみてください:GETメソッドが値を返した場合、値が期限切れになっていると仮定します。//この時点で、別のノードが値を設定し、ロックは別のスレッド(ノードが保持)によって保持され、次の判断//この状況を検出できません。ただし、この問題は他の問題を引き起こすべきではありません。この方法の目的は//は同期制御ではないため、ロックステータスのレポートにすぎません。 return!istimeexpired(value); }} @Override Protected void lock0 {// todoは、ロックが文字列値= jedis.get(lockkey)の有効期限が切れるかどうかを決定します。 if(!istimeexpired(value)){dounlock; }} private void checkinterruption interruptedexception {if(thread.currentthread.isInterrupted){show new interruptedException; }} private boolean istimeexpired(string value){return long.parselong(value)<system.currenttimemillis; } private boolean istimeout(long start、long timeout){return start + timeout> system.currenttimemillis; } private void dounlock {jedis.del(lockkey); }}将来の実装方法( zookeeperなどなど)を変更すると、 AbstractLock直接継承してL ock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)およびunlock0メソッド(いわゆる抽象化)を実装できます。
テスト
グローバルID栽培者をシミュレートし、 IDGeneratorクラスを設計します。このクラスは、グローバルな増分IDを生成する責任があります。そのコードは次のとおりです。
パッケージcc.lixiaohui.lock; Import java.math.biginteger; Import java.util.concurrent.timeunit;/** * id Generation * @author lixiaohui * */public class idgenerator {private static biginteger id = biginteger.valueof(0);プライベートファイナルロックロック。プライベート静的最終biginteger increment = biginteger.valueof(1); public idgenerator(lock lock){this.lock = lock; } public string getandincrement {if(lock.trylock(3、timeunit.seconds)){try {//ここでロックを取得し、クリティカルエリアリソースリターンgetandincrement0にアクセスします。 }最後に{lock.unlock; }} nullを返します。 // getandincrement0を返します。 } private string getandincrement0 {string s = id.toString; id = id.add(increment); s; }}メインロジックのテスト:同じJVMで2つのスレッドを開き、致命的なループ(ループ間に間隔がありません。1つがある場合は、 IDが意味がありません)を取得して(私はデッドループではなく、20秒間実行されます)、IDを取得して同じSetに保存します。保存する前に、 setにIDが存在するかどうかを確認します。すでに存在する場合は、両方のスレッドを停止させます。プログラムが正常に20秒実行できる場合、この分散ロックが要件を満たすことができることを意味します。このようなテストの効果は、異なるJVMの効果と同じでなければなりません(つまり、実際の分散環境で)。以下は、テストクラスのコードです。
パッケージcc.lixiaohui.distributedlock.distributedlock; import java.util.hashset; import java.util.set; import org.junit.test; import redis.clients.jedis; cc.lixiaohui.lock.redisbaseddistributedlock; public class idgeneratortest {private static set <string> generateds = new Hashset <String>;プライベート静的最終文字列lock_key = "lock.lock"; private static final long lock_expire = 5 * 1000; @test public voidテストは、断続的なexceptionをスローします{jedis jedis1 = new jedis( "localhost"、6379); lock lock1 = new redisbaseddistributedlock(jedis1、lock_key、lock_expire); idgenerator G1 = new idgenerator(lock1); idconsumemission Consume1 = new idconsumemission(g1、 "Consume1"); Jedis Jedis2 = new Jedis( "LocalHost"、6379); lock lock2 = new redisbaseddistributedlock(jedis2、lock_key、lock_expire); idgenerator g2 = new idgenerator(lock2); idconsumemission Consume2 = new idconsumemission(g2、 "Consume2");スレッドT1 =新しいスレッド(CONSUME1);スレッドT2 =新しいスレッド(CONSUME2); t1.Start; t2.Start; thread.sleep(20 * 1000); // 2つのスレッドを20秒間実行しますidconsumemission.stop; T1.Join; T2.Join; } static string time {return string.valueof(system.currenttimemillis / 1000); } static class idconsumemission runnable {private idgenerator idgenerator;プライベート文字列名;プライベート静的揮発性ブールストップ; public idconsumemission(idgenerator idgenerator、string name){this.idgenerator = idgenerator; this.name = name; } public static void stop {stop = true; } public void run {system.out.println(time + ":cushume" + name + "start"); while(!stop){string id = idgenerator.getandincrement; if(genedIds.contains(id)){system.out.println(time + ":duplicate id generated、id =" + id); stop = true;続く; } genedids.add(id); system.out.println(time + ":cushume" + name + "add id =" + id); } system.out.println(time + ":Consume" + name + "done"); }}}明確にするために、ここで2つのスレッドを止める方法はあまり良くありません。それは単なるテストであるため、私はこれを便利にしたので、これを行わないことが最善です。
テスト結果
20代で印刷されたものが多すぎます。正面に印刷されたものはclearおり、実行がほぼ終了した場合にのみ利用可能です。以下のスクリーンショット。これは、このロックが正常に機能することを示しています。
IDGereratorがロックされていない場合(つまり、 IDGereratorのgetAndIncrementメソッドは、内部的にidを取得したときにロックしない)、テストは通過せず、途中で停止する可能性が非常に高いです。ロックがロックされていない場合のテスト結果は次のとおりです。
これには1秒未満かかります:
これには1秒もかかりません:
結論
OK、上記はすべて、Redisに基づいて分散ロックを実装するJavaに関するものです。問題がある場合は、それらを修正したいと考えています。この記事があなたが勉強して仕事をするのに役立つことを願っています。ご質問がある場合は、メッセージを残してコミュニケーションをとることができます。