A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
CyclicBarrier(int parties, Runnable barrierAction) Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.
Method
await() Waits until all parties have invoked await on this barrier.
Returns: the arrival index of the current thread, where index getParties() - 1 indicates the first to arrive and zero indicates the last to arrive
Throws:
InterruptedException - if the current thread was interrupted while waiting
BrokenBarrierException - if another thread was interrupted or timed out while the current thread was waiting, or the barrier was reset, or the barrier was broken when await was called, or the barrier action (if present) failed due to an exception
await(long timeout, TimeUnit unit) Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.
Semaphore
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.
blockingQueue.add("a") = true blockingQueue.add("b") = true blockingQueue.add("c") = true Exception in thread "main" java.lang.IllegalStateException: Queue full at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98) at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:326) at com.wuyue.BlockingQueueDemo.main(BlockingQueueDemo.java:21
blockingQueue.add("a") = true blockingQueue.add("b") = true blockingQueue.add("c") = true blockingQueue.remove() = a blockingQueue.remove() = b blockingQueue.remove() = c Exception in thread "main" java.util.NoSuchElementException at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117) at com.wuyue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ publicstatic ExecutorService newFixedThreadPool(int nThreads) { returnnewThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>()); }
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ publicstatic ExecutorService newSingleThreadExecutor() { returnnewFinalizableDelegatedExecutorService (newThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, newLinkedBlockingQueue<Runnable>())); }
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ publicstatic ExecutorService newCachedThreadPool() { returnnewThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, newSynchronousQueue<Runnable>()); }
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) thrownewIllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) thrownewNullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
Exception in thread "0" java.util.ConcurrentModificationException at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1043) at java.base/java.util.ArrayList$Itr.next(ArrayList.java:997) at java.base/java.util.AbstractCollection.toString(AbstractCollection.java:472) at java.base/java.lang.String.valueOf(String.java:2951) at java.base/java.io.PrintStream.println(PrintStream.java:897) at com.wuyue.NotSafeListDemo.lambda$main$0(NotSafeListDemo.java:20) at java.base/java.lang.Thread.run(Thread.java:834)