Semaphore is a class commonly used in JUC package. It is an application of AQS sharing mode. It can allow multiple threads to operate on shared resources at the same time and can effectively control the number of concurrency. It can achieve good traffic control. Semaphore provides a concept of a license, which can be regarded as a bus ticket. Only those who successfully obtain the ticket can get on the bus. There are a certain number of tickets and it is impossible to issue them without restrictions, which will lead to overloading of the bus. So when the ticket is issued (the bus is full), the others can only wait for the next train. If someone gets off the bus halfway through, his position will be free, so if others want to get on the bus at this time, they can get tickets again. Various pools can be implemented using Semaphore. At the end of this article, we will write a simple database connection pool. First, let’s take a look at the constructor of Semaphore.
//Constructor 1public Semaphore(int permits) { sync = new NonfairSync(permits);}//Constructor 2public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits);}Semaphore provides two parameter-free constructors, but no parameter-free constructors are provided. Both constructors must pass in an initial number of licenses. The semaphore constructed using constructor 1 will be obtained in a non-fair way when obtaining the license. Using constructor 2 can specify the method of obtaining the license through parameters (fair or unfair). Semaphore mainly provides two types of APIs to the outside world, obtaining licenses and releasing licenses. The default is to obtain and release one license, and parameters can also be passed in to obtain and release multiple licenses at the same time. In this article, we will only talk about the situation of obtaining and releasing one license each time.
1. Obtain a license
//Get a license (response interrupt) public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}//Get a license (not responding to interrupt) public void acquireUninterruptibly() { sync.acquireShared(1);}//Try to obtain a license (nonfair acquisition) public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0;}//Try to obtain a license (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}The above API is the default license acquisition operation provided by Semaphore. Only obtaining one license at a time is also a common situation in real life. In addition to direct fetching, it also provides attempt to fetching. The direct fetching operation may block the thread after failure, while attempting to fetching will not. It should also be noted that the tryAcquire method is used to try to obtain it in an unfair way. What we often use in normal times is to obtain a license. Let’s take a look at how it is obtained. You can see that the acquire method directly calls sync.acquireSharedInterruptibly(1). This method is the method in AQS. We once talked about the AQS source code series articles. Let's review it again.
//Acquiring the lock in interruptable mode (shared mode) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //First determine whether the thread is interrupted, if so, throw an exception if (Thread.interrupted()) { throw new InterruptedException(); } //1. Try to acquire the lock if (tryAcquireShared(arg) < 0) { //2. If the acquisition fails, enter the method doAcquireSharedInterruptibly(arg); }}The first method acquireSharedInterruptibly is to call the tryAcquireShared method to try to obtain. tryAcquireShared is an abstract method in AQS. The two derived classes FairSync and NonfairSync implement the logic of this method. FairSync implements the logic of fair acquisition, while NonfairSync implements the logic of non-fair acquisition.
abstract static class Sync extends AbstractQueuedSynchronizer { //Try to obtain final int nonfairTryAcquireShared(int acquires) { for (;;) { //Get available licenses int available = getState(); //Get remaining licenses int remaining = available - acquires; //1. If remaining less than 0, return remaining directly //2. If remaining greater than 0, update the synchronization status first and then return remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } }}//Nonfairsync static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } //Try to obtain a license protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}//Fair synchronizer static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //Try to obtain the license protected int tryAcquireShared(int acquires) { for (;;) { //Judge whether there is anyone in front of the synchronization queue if (hasQueuedPredecessors()) { //If there is any, return -1, indicating that the attempt to obtain failed return -1; } //Get available licenses int available = getState(); //Get the remaining licenses int remaining = available - acquires; //1. If remaining less than 0, return directly to remaining //2. If remaining is greater than 0, the synchronization status will be updated first and then returned to remaining if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } }}It should be noted here that the tryAcquireShared method of NonfairSync directly calls the nonfairTryAcquireShared method, which is in the parent class Sync. The logic of the non-fair acquisition lock is to first take out the current synchronization state (synchronous state represents the number of licenses), subtract the parameter of the current synchronization state. If the result is not less than 0, it is proved that there are still available licenses, then the value of the synchronization state is directly updated using CAS operation, and finally, the result value will be returned regardless of whether the result is less than 0. Here we need to understand the meaning of the return value of the tryAcquireShared method. Returning a negative number means the acquisition failed, zero means the current thread is successfully acquired but the subsequent thread cannot be obtained anymore, and positive number means the current thread is successfully acquired and the subsequent thread can also be obtained. Let's look at the code of the acquireSharedInterruptibly method.
//Acquiring locks in interruptible mode (shared mode) public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //First determine whether the thread is interrupted, if so, throw an exception if (Thread.interrupted()) { throw new InterruptedException(); } //1. Try to acquire the lock//Negative number: indicates that the acquisition failed//Zero value: indicates that the current thread is successfully acquired, but the subsequent thread can no longer obtain //Positive number: indicates that the current thread is successfully acquired, and the subsequent thread can also obtain success if (tryAcquireShared(arg) < 0) { //2. If the acquisition fails, enter the method doAcquireSharedInterruptibly(arg); }}If the returned remaining is less than 0, it means the acquisition failed. Therefore, tryAcquireShared(arg) < 0 is true, so the doAcquireSharedInterruptibly method will be called next. When we talked about AQS, it will wrap the current thread into a node and put it into the tail of the synchronization queue, and it is possible to suspend the thread. This is also the reason why threads will queue up and block when remaining less than 0. If the returned remaining>=0 means that the current thread has been successfully acquired. Therefore, tryAcquireShared(arg) < 0 is a flase, so the doAcquireSharedInterruptibly method will no longer be called to block the current thread. The above is the entire logic of unfair acquisition. When fair acquisition, you only need to call the hasQueuedPredecessors method before this to determine whether someone is queuing in the synchronization queue. If so, return -1 directly indicates that the acquisition failed, otherwise the following steps are continued as unfair acquisition.
2. Release the license
//Release a license public void release() { sync.releaseShared(1);}Calling the release method is to release a license. Its operation is very simple, so we call the releaseShared method of AQS. Let's take a look at this method.
//Release lock operation (shared mode) public final boolean releaseShared(int arg) { //1. Try to release the lock if (tryReleaseShared(arg)) { //2. If the release is successful, wake up other threads doReleaseShared(); return true; } return false;}The releaseShared method of AQS first calls the tryReleaseShared method to try to release the lock. The implementation logic of this method is in the subclass Sync.
abstract static class Sync extends AbstractQueuedSynchronizer { ... //Try to release the operation protected final boolean tryReleaseShared(int releases) { for (;;) { //Get the current synchronization state int current = getState(); //Plus the current synchronization state as follows int next = current + releases; //If the addition result is less than the current synchronization state, an error will be reported if (next < current) { throw new Error("Maximum permit count exceeded"); } //Update the value of the synchronization state in CAS mode, and return true if the update is successful, otherwise continue to loop if (compareAndSetState(current, next)) { return true; } } } ...}You can see that the tryReleaseShared method uses a for loop to spin. First, get the synchronization state, add the incoming parameters, and then update the synchronization state in CAS. If the update is successful, return true and jump out of the method. Otherwise, the loop will continue until it is successful. This is the process of Semaphore releasing the license.
3. Write a connection pool manually
The Semaphore code is not very complicated. The commonly used operation is to obtain and release a license. The implementation logic of these operations is relatively simple, but this does not hinder the widespread application of Semaphore. Next, we will use Semaphore to implement a simple database connection pool. Through this example, we hope readers can have a deeper understanding of Semaphore's use.
public class ConnectPool { //Connection pool size private int size; //Database connection collection private Connect[] connects; //Connection status flag private boolean[] connectFlag; //Remaining number of available connections private volatile int available; //Semaphore private Semaphore semaphore; //Constructor public ConnectPool(int size) { this.size = size; this.available = size; semaphore = new Semaphore(size, true); connects = new Connect[size]; connectFlag = new boolean[size]; initConnects(); } //Initialize the connection private void initConnects() { //Generate a specified number of database connections for(int i = 0; i < this.size; i++) { connects[i] = new Connect(); } } //Get database connection private synchronized Connect getConnect(){ for(int i = 0; i < connectFlag.length; i++) { //Transfer the collection to find unused connections if(!connectFlag[i]) { //Set the connection to in use connectFlag[i] = true; //Subtract the number of available connections available--; System.out.println("【"+Thread.currentThread().getName()+"】 to get the number of connections remaining: " + available); //Return the connection reference return connects[i]; } } return null; } //Get a connection public Connect openConnect() throws InterruptedException { //Get license semaphore.acquire(); //Get database connection return getConnect(); } //Release a connection public synchronized void release(Connect connect) { for(int i = 0; i < this.size; i++) { if(connect == connects[i]){ //Set the connection to not used connectFlag[i] = false; //Add 1 available connection number; System.out.println("【"+Thread.currentThread().getName()+"] to release the remaining connection number: " + available); //Release license semaphore.release(); } } } //Add number of available connections public int available() { return available; } }Test code:
public class TestThread extends Thread { private static ConnectPool pool = new ConnectPool(3); @Override public void run() { try { Connect connect = pool.openConnect(); Thread.sleep(100); // Take a break pool.release(connect); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { for(int i = 0; i < 10; i++) { new TestThread().start(); } }}Test results:
We use an array to store references to database connections. When initializing the connection pool, we will call the initConnects method to create a specified number of database connections and store their references in the array. In addition, there is an array of the same size to record whether the connection is available. Whenever an external thread requests to obtain a connection, first call the semaphore.acquire() method to obtain a license, then set the connection status to in use, and finally return the reference to the connection. The number of licenses is determined by the parameters passed in during construction. The number of licenses is reduced by 1 every time the semaphore.acquire() method is called. When the number is reduced to 0, it means that there is no connection available. At this time, if other threads get it again, it will be blocked. Whenever a thread releases a connection, semaphore.release() will be called to release the license. At this time, the total number of licenses will increase again, which means that the number of available connections has increased. Then the previously blocked thread will wake up and continue to obtain the connection. At this time, you can successfully obtain the connection by obtaining it again. In the test example, a connection pool of 3 connections is initialized. We can see from the test results that whenever a thread obtains a connection, the number of connections remaining will be reduced by 1. When the thread reduces to 0, other threads can no longer obtain it. At this time, you must wait for a thread to release the connection before continuing to obtain it. You can see that the number of remaining connections always changes between 0 and 3, which means that our test was successful.
The above is all the content of this article. I hope it will be helpful to everyone's learning and I hope everyone will support Wulin.com more.