Use wait() and notify() to achieve inter-thread collaboration
1. wait() and notify()/notifyAll()
When sleep() and yield() are called, the lock is not released, and calling wait() will release the lock. In this way another task (thread) can obtain the lock of the current object, thus entering its synchronized method. You can resume execution from wait() through notify()/notifyAll() or the time expires.
Wait(), notify(), and notifyAll() can only be called in the synchronization control method or synchronization block. If these methods are called in an asynchronous method, an IllegalMonitorStateException exception will be thrown at runtime.
2. Simulate the wake-up of multiple threads by a single thread
Simulate collaboration between threads. The Game class has 2 synchronization methods prepare() and go(). The flag start is used to determine whether the current thread needs wait(). The instance of the Game class starts all Athele class instances first and enter the wait() state. After a period of time, change the flag bit and notifyAll() all Athele threads in the wait state.
Game.java
package concurrency;import java.util.Collection;import java.util.Collections;import java.util.HashSet;import java.util.Iterator;import java.util.Set;class Athlete implements Runnable { private final int id; private Game game; public Athlete(int id, Game game) { this.id = id; this.game = game; } public boolean equals(Object o) { if (!(o instanceof Athlete)) return false; Athlete athlete = (Athlete) o; return id == athlete.id; } public String toString() { return "Athlete<" + id + ">"; } public int hashCode() { return new Integer(id).hashCode(); } public void run() { try { game.prepare(this); } catch (InterruptedException e) { System.out.println(this + " quit the game"); } } } public class Game implements Runnable { private Set<Athlete> players = new HashSet<Athlete>(); private boolean start = false; public void addPlayer(Athlete one) { players.add(one); } public void removePlayer(Athlete one) { players.remove(one); } public Collection<Athlete> getPlayers() { return Collections.unmodifiableSet(players); } public void prepare(Athlete athlete) throws InterruptedException { System.out.println(athlete + " ready!"); synchronized (this) { while (!start) wait(); if (start) System.out.println(athlete + " go!"); } } public synchronized void go() { notifyAll(); } public void ready() { Iterator<Athlete> iter = getPlayers().iterator(); while (iter.hasNext()) new Thread(iter.next()).start(); } public void run() { start = false; System.out.println("Ready..."); System.out.println("Ready..."); System.out.println("Ready..."); ready(); start = true; System.out.println("Go!"); go(); } public static void main(String[] args) { Game game = new Game(); for (int i = 0; i < 10; i++) game.addPlayer(new Athlete(i, game)); new Thread(game).start(); }} result:
Ready...Ready...Ready...Athlete<0> ready!Athlete<1> ready!Athlete<2> ready!Athlete<3> ready!Athlete<4> ready!Athlete<5> ready!Athlete<6> ready!Athlete<7> ready!Athlete<8> ready!Athlete<9> ready!Go!Athlete<9> go!Athlete<8> go!Athlete<7> go!Athlete<6> go!Athlete<5> go!Athlete<4> go!Athlete<3> go!Athlete<2> go!Athlete<1> go!Athlete<0> go!
3. Simulate the busy waiting process
An instance of the MyObject class is the observer. When an observation event occurs, it will notify an instance of the Monitor class (the way it is to change a flag). The instance of this Monitor class constantly checks whether the flag bit changes by waiting.
BusyWaiting.java
import java.util.concurrent.TimeUnit;class MyObject implements Runnable { private Monitor monitor; public MyObject(Monitor monitor) { this.monitor = monitor; } public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println("i'm going."); monitor.gotMessage(); } catch (InterruptedException e) { e.printStackTrace(); } }}class Monitor implements Runnable { private volatile boolean go = false; public void gotMessage() throws InterruptedException { go = true; } public void watching() { while (go == false) ; System.out.println("He has gone."); } public void run() { watching(); }}public class BusyWaiting { public static void main(String[] args) { Monitor monitor = new Monitor(); MyObject o = new MyObject(monitor); new Thread(o).start(); new Thread(monitor).start(); }} result:
i'm going.He has gone.
4. Use wait() and notify() to rewrite the above example
The following example replaces the busy waiting mechanism through wait(). When a notification message is received, notify the current Monitor class thread.
Wait.java
package concurrency.wait;import java.util.concurrent.TimeUnit;class MyObject implements Runnable { private Monitor monitor; public MyObject(Monitor monitor) { this.monitor = monitor; } Start the thread regularly
Here are two ways to start a thread after a specified time. First, it is implemented through java.util.concurrent.DelayQueue; second, it is implemented through java.util.concurrent.ScheduledThreadPoolExecutor.
1. java.util.concurrent.DelayQueue
The class DelayQueue is an unbounded blocking queue from which elements can be extracted only when the delay expires. It accepts instances that implement the Delayed interface as elements.
<<interface>>Delayed.java
package java.util.concurrent;import java.util.*;public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit);}getDelay() returns the remaining delay time associated with this object, expressed in given time units. The implementation of this interface must define a compareTo method that provides a consistent sort with the getDelay method of this interface.
The head of the DelayQueue queue is the Delayed element with the longest storage time after the delay expires. Expiration occurs when the getDelay(TimeUnit.NANOSECONDS) method of an element returns a value less than or equal to 0.
2. Design queues with time delay characteristics
The class DelayedTasker maintains a DelayQueue<DelayedTask> queue, where DelayedTask implements the Delayed interface and is defined by an internal class. Both external classes and internal classes implement the Runnable interface. For external classes, its run method takes out tasks in the queue in sequence according to the defined time, and these tasks are instances of internal classes. The run method of internal classes defines the specific logic of each thread.
The essence of this design is to define a list of thread tasks with time characteristics, and the list can be of any length. Specify the startup time each time you add a task.
DelayedTasker.java
package com.zj.timedtask;import static java.util.concurrent.TimeUnit.SECONDS;import static java.util.concurrent.TimeUnit.NANOSECONDS;import java.util.Collection;import java.util.Collections;import java.util.Collections;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class DelayedTasker implements Runnable { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); public void addTask(DelayedTask e) { queue.put(e); } public void removeTask() { queue.poll(); } public Collection<DelayedTask> getAllTasks() { return Collections.unmodifiableCollection(queue); } public int getTaskQuantity() { return queue.size(); } public void run() { while (!queue.isEmpty()) try { queue.take().run(); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println("Finished DelayedTask"); } public static class DelayedTask implements Delayed, Runnable { private static int counter = 0; private final int id = counter++; private final int delta; private final long trigger; public DelayedTask(int delayInSeconds) { delta = delayInSeconds; trigger = System.nanoTime() + NANOSECONDS.convert(delta, SECONDS); } public long getDelay(TimeUnit unit) { return unit.convert(trigger - System.nanoTime(), NANOSECONDS); } public int compareTo(Delayed arg) { DelayedTask that = (DelayedTask) arg; if (trigger < that.trigger) return -1; if (trigger > that.trigger) return 1; return 0; } public void run() { //run all that you want to do System.out.println(this); } public String toString() { return "[" + delta + "s]" + "Task" + id; } } public static void main(String[] args) { Random rand = new Random(); ExecutorService exec = Executors.newCachedThreadPool(); DelayedTasker tasker = new DelayedTasker(); for (int i = 0; i < 10; i++) tasker.addTask(new DelayedTask(rand.nextInt(5))); exec.execute(tasker); exec.shutdown(); }} result:
[0s]Task 1[0s]Task 2[0s]Task 3[1s]Task 6[2s]Task 5[3s]Task 8[4s]Task 0[4s]Task 4[4s]Task 7[4s]Task 9FinishedDelayedTask
3. java.util.concurrent.ScheduledThreadPoolExecutor
This class can be scheduled to run tasks (threads) after a given delay, or to perform tasks regularly (repeat). In the constructor, you need to know the size of the thread pool. The main method is:
[1] schedule
public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit)
Creates and performs a one-time operation enabled after a given delay.
Designated by:
- schedule in interface ScheduledExecutorService;
parameter:
-command - The task to be performed;
-delay - The time to delay execution from now on;
-unit - The time unit of delay parameters;
return:
- Indicates the ScheduledFuture that suspends the task and its get() method will return null after completion.
[2] scheduleAtFixedRate
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit)
Create and execute a periodic operation that is first enabled after a given initial delay, with subsequent operations having a given period; that is, it will start after initialDelay, then after initialDelay+period, then after initialDelay + 2 * period, and so on. If any execution of a task encounters an exception, subsequent execution will be cancelled. Otherwise, the task can only be terminated by executing the cancel or termination method of the program. If any of the executions of this task take longer than its cycle, subsequent execution will be postponed, but not simultaneously.
Designated by:
- scheduleAtFixedRate in interface ScheduledExecutorService;
parameter:
-command - The task to be performed;
-initialDelay - The delay time for the first execution;
-period - The period between continuous executions;
-unit - the time unit of initialDelay and period parameters;
return:
- Indicates that the ScheduledFuture of the suspended task is completed, and its get() method will throw an exception after it is cancelled.
4. Design thread executors with time delay characteristics
The class ScheduleTasked associates a ScheduledThreadPoolExcutor, which can specify the size of the thread pool. Know the thread and delay time through the schedule method, and close the thread pool through the shutdown method. The logic of specific tasks (threads) has certain flexibility (compared to the previous design, the previous design must define the thread's logic in advance, but the thread's specific logic design can be modified by inheritance or decoration).
ScheduleTasker.java
package com.zj.timedtask;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class ScheduleTasker { private int corePoolSize = 10; ScheduledThreadPoolExecutor scheduler; public ScheduleTasker() { scheduler = new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduleTasker(int quantity) { corePoolSize = quantity; scheduler = new ScheduledThreadPoolExecutor(corePoolSize); } public void schedule(Runnable event, long delay) { scheduler.schedule(event, delay, TimeUnit.SECONDS); } public void shutdown() { scheduler.shutdown(); } public static void main(String[] args) { ScheduleTasker tasker = new ScheduleTasker(); tasker.schedule(new Runnable() { public void run() { System.out.println("[1s]Task 1"); } }, 1); tasker.schedule(new Runnable() { public void run() { System.out.println("[2s]Task 2"); } }, 2); tasker.schedule(new Runnable() { public void run() { System.out.println("[4s]Task 3"); } }, 4); tasker.schedule(new Runnable() { public void run() { public void run() { System.out.println("[10s]Task 4"); } }, 10); tasker.shutdown(); }} result:
[1s]Task 1[2s]Task 2[4s]Task 3[10s]Task 4 public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println("i'm going."); monitor.gotMessage(); } catch (InterruptedException e) { e.printStackTrace(); } }} class Monitor implements Runnable { private volatile boolean go = false; public synchronized void gotMessage() throws InterruptedException { go = true; notify(); } public synchronized void watching() throws InterruptedException { while (go == false) wait(); System.out.println("He has gone."); } public void run() { try { watching(); } catch (InterruptedException e) { e.printStackTrace(); } }}public class Wait { public static void main(String[] args) { Monitor monitor = new Monitor(); MyObject o = new MyObject(monitor); new Thread(o).start(); new Thread(monitor).start(); }} result:
i'm going.He has gone.