【阻塞队列和并发修改异常】fail-fast和fail-safe,阻塞队列体系,3组方法,优先阻塞队列,延迟队列,链表无界阻塞,同步队列,非阻塞队列
1. 集合3种错误
迭代器的remove报错
- 最顶层报错
public interface Iterator<E> {default void remove() {throw new UnsupportedOperationException("remove");}
}
- 实现类 会报 这个错
public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E> { public void remove() {if (lastRet < 0)throw new IllegalStateException();}private class Itr implements Iterator<E> {int lastRet = -1;//默认为-1//执行第一步public void remove() {if (lastRet < 0)//报错1throw new IllegalStateException();//使用next后,进入这里checkForComodification();}try {AbstractList.this.remove(lastRet);if (lastRet < cursor)cursor--;lastRet = -1;expectedModCount = modCount;} catch (IndexOutOfBoundsException e) {throw new ConcurrentModificationException();}}
}
直接移除报错IllegalStateException
所以list会报 IllegalStateException
List<Integer> list = Arrays.asList(1, 2, 3);Iterator<Integer> iterator = list.iterator();//直接移除报错:Exception in thread "Thread-0" java.lang.IllegalStateExceptioniterator.remove();//依然是同样的错List<Integer> list = new ArrayList<>();
- 正常使用的逻辑
List<Integer> list = new ArrayList<>();list.add(1);Iterator<Integer> iterator = list.iterator();iterator.next();iterator.remove();
Arrays.asList的next后UnsupportedOperationException
- UnsupportedOperationException
List<Integer> list = Arrays.asList(1, 2, 3);//固定长度的ArrayListSystem.out.println(iterator.next());//Exception in thread "Thread-0" java.lang.UnsupportedOperationExceptioniterator.remove();
List<Integer> list = Arrays.asList(1, 2, 3);list.add(333);//Exception in thread "main" java.lang.UnsupportedOperationException//最终调用的源码如下。因为:Arrays的内部类ArrayList,没添加的方法。//会调用父类的 AbstractList,是 扔异常的public void add(int index, E element) {throw new UnsupportedOperationException();}
Arrays.asList(1,2,3) //是一个特殊的 ArrayListpublic static <T> List<T> asList(T... a) {return new ArrayList<>(a);}//核心是 Arrays的内部类private static class ArrayList<E> extends AbstractList<E>implements RandomAccess, java.io.Serializable{ArrayList(E[] array) {a = Objects.requireNonNull(array);}}
public void remove() {if (lastRet < 0)//报错1throw new IllegalStateException();//使用next后,进入这里。lastRet 为 0checkForComodification();try {//将会执行 remove的错误。即:最顶层的错误AbstractList.this.remove(lastRet);}}//这里并没有 并发异常,不走这个错误final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}
ConcurrentModificationException
List<Integer> list = new ArrayList();list.add(1);Iterator<Integer> iterator = list.iterator();//原来只有一个元素。hashNext第二次会返回 false//现在 hashNext 第二次依然返回truewhile (iterator.hasNext()) {//第二次 进入 next 方法。进入检查的方法,就会报错://Exception in thread "main" java.util.ConcurrentModificationExceptioniterator.next();//因为 这里加入了一个元素。list.add(4);}
public E next() {checkForComodification();}final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}
CopyOnWriteArrayList 最新的添加不显示
//底层是 cas,会复制一份// private transient volatile Object[] array;List<Integer> list = new CopyOnWriteArrayList<>();list.add(1);Iterator<Integer> iterator = list.iterator();//第二次循环,hasNext 会返回 falsewhile (iterator.hasNext()) {iterator.next();list.add(4);}
fail-fast和fail-safe
迭代器:Java Collection 定义了两种类型的迭代器:fail-fast和fail-safe。
fail-fast:如果集合在迭代时被修改,则立即抛出 ConcurrentModificationException 。
- java.util包下的集合类都是fail-fast,不能在多线程下发生并发修改(迭代过程中被修改)。
- 它们都在迭代器的实现中声明了一个transient int modCount,通过比较该值判断集合是否被修改。
protected transient int modCount = 0;int expectedModCount = modCount;final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}
fail-safe:如果集合在迭代时被修改,不会抛出 ConcurrentModificationException,
- 因为它们操作的是集合的克隆,而不是实际的集合。
- java.util.concurrent包下的集合都是fail-safe,可以在多线程下并发使用,并发修改。
并发修改:简单的说,并发修改在一个JVM内是多个线程同时修改同一个对象的过程。
2. 阻塞队列BlockingQueue
2.1 核心的3组方法
add remove offer poll加时间 put take
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
抛出异常
当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException: Queue full
当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
特殊值
插入方法,成功ture失败false
移除方法,成功返回出队列的元素,队列里面没有就返回null
一直阻塞
当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。
BlockingQueue<String> b = new ArrayBlockingQueue(2);System.out.println(b.add("1"));System.out.println(b.add("2"));//llegalStateException: Queue full//System.out.println(b.add("3"));//在开头的为:1System.out.println("在开头的为:" + b.element());//先进先出,先出来 1System.out.println(b.remove());System.out.println(b.remove());//java.util.NoSuchElementExceptionSystem.out.println(b.remove());
2.2 阻塞队列解释
- 如果队列已满(当队列有界时)或变为空,则队列 阻塞访问线程。
- 如果队列已满,则添加新元素将阻塞访问线程,除非有空间可用于新元素。
- 同样,如果队列为空,则访问元素会阻塞调用线程。
无界队列:在创建时未指定队列的大小。
- 因此,队列可以随着元素的添加而动态增长。但是,如果没有剩余内存,则会抛出java.lang.OutOfMemoryError。
2.3 体系
ArrayBlockingQueue是有界的 (固定大小)阻塞队列实现,
LinkedBlockingQueue是一个可选的有界阻塞队列实现,
PriorityBlockingQueue是一个可以按特定顺序消费项目,
- 支持优先级排序的无界阻塞队列。
DelayQueue 队列实现的延迟无界阻塞队列
LinkedTransferQueue提供了实现一种背压形式的能力。
- 由链表结构组成的无界阻塞队列
SynchronousQueue提供在线程之间交换数据的简单方法
-
只有一个不取走,不生产第二个。
-
Collection
- Queue 队列
- BlockingQueue 阻塞队列接口
- LinkedTransferQueue 由链表结构组成的无界阻塞队列。
- BlockingDeque
- LinkedBlockingDeque 由链表结构组成的双向阻塞队列
- Priority BlockingQueue 支持优先级排序的无界阻塞队列。
- *** SynchronousQueue**
- 不存储元素的阻塞队列,也即单个元素的队列。
- 只有一个不取走,不生产第二个。
- 不存储元素的阻塞队列,也即单个元素的队列。
- DelayQueue 使用优先级队列实现的延迟无界阻塞队列。
- *** Array** BlockingQueue
- 由数组结构组成的有界阻塞队列。
- *** Linked** BlockingQueue
- 由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
- BlockingQueue 阻塞队列接口
- Queue 队列
block
n.
大块,一块(木料、石头等);(四面临街的)街段,
v.
阻塞,堵塞(道路、管道等);遮住(视线)priority
英
/praɪˈɒrəti/
n.
优先事项,最重要的事;优先,优先权,重点;<英>优先通行权
adj.
优先的synchron ized
synchronous
英
/ˈsɪŋkrənəs/
adj.
同步的;同时的offer
英
/ˈɒfə(r)/
v.
提供,给予;提议,表示愿意(做某事);出(价),开(价);提出,作出;表示(爱、友谊等);
n.
主动提议,提供; 出价,报价; (商品的)特价,特惠;求婚poll
n.
民意调查,民意测验;选举投票,计票;投票数;
v.
对……进行民意测验(调查);获得(票数);(电信,计算机)轮询,探询;peek
英
/piːk/
v.
偷看,窥视;微露出,探出
n.
一瞥,偷偷地一看;(计算机)读取数据
优先队列PriorityBlockingQueue
- 总之会根据对象的 PriorityBlockingQueue决定权重
public class PriorityBlockingQueueTest {public static void main(String[] args) {//长度10的 排队阻塞队列PriorityBlockingQueue<Patient> pbq = new PriorityBlockingQueue<>(10);//创建3个 年轻人,放入for (int i = 0; i < 3; i++) {Patient patent = new Patient("Patent" + i, 20 + i);pbq.offer(patent);}//创建一个 年老的人Patient oldMan = new Patient("OldMan", 88);pbq.offer(oldMan);Patient patient = null;do {//逐步弹出patient = pbq.poll();if (patient != null) {System.out.println(patient.name + "挂号成功!");}} while (patient != null);}static class Patient implements Comparable<Patient> {private String name;private Integer age;private long waitingTime;public Patient(String name, Integer age) {this.name = name;this.age = age;this.waitingTime = System.nanoTime();}@Overridepublic int compareTo(Patient o) {// 80岁和以上,返回 -1,权限更高if (age >= 80) {return -1;} /*elseif (o.age >= 80) {return 1;}*///谁先来,谁先执行return waitingTime < o.waitingTime ? -1 : 1;}}
}
DelayQueue(延时队列)
- https://blog.csdn.net/c15158032319/article/details/118636233
DelayQueue 是一个通过PriorityBlockingQueue实现延迟获取元素的无界队列 无界阻塞队列,
- 其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),
- 而且只有在延迟期满后才能从中提取元素。
DelayQueue可以运用在以下应用场景:
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
- 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
案例
- 必然是 延迟3秒,5秒,8秒 这样执行
delayTime=3, expire=1680947698428, data=第二次添加任务
delayTime=5, expire=1680947700428, data=第三次添加任务
delayTime=8, expire=1680947703428, data=第一次添加任务
public class DelayQueueDemo {static BlockingQueue<Delayed> queue = new DelayQueue();public static void main(String[] args) throws InterruptedException {queue.add(new MyDelay(8, "第一次添加任务"));queue.add(new MyDelay(3, "第二次添加任务"));queue.add(new MyDelay(5, "第三次添加任务"));while (!queue.isEmpty()) {Delayed delayed = queue.take();System.out.println(delayed);}}
}class MyDelay<T> implements Delayed {long delayTime; // 延迟时间long expire; // 过期时间T data;public MyDelay(long delayTime, T t) {this.delayTime = delayTime;// 过期时间 = 当前时间 + 延迟时间this.expire = System.currentTimeMillis() + delayTime * 1000;data = t;}/*** 剩余时间 = 到期时间 - 当前时间*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 优先级规则:两个任务比较,时间短的优先执行*/@Overridepublic int compareTo(Delayed o) {long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return (int) f;}@Overridepublic String toString() {return "delayTime=" + delayTime +", expire=" + expire +", data=" + data;}
}
LinkedTransferQueue 链表无界阻塞
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
- 可以算是 LinkedBolckingQueue 和 SynchronousQueue 和合体。
- LinkedTransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部节点分为数据结点、请求结点;基于CAS无锁算法实现
transfer
v.
(使)转移,搬迁;转移(感情),传染(疾病),转让(权力等);(使)调动,转职;转会
n.
转移,转让,调动;(运动员)转会;转换,过渡;已调动的人,已转移的东西;
案例
LinkedTransferQueue<String> g = new LinkedTransferQueue<String>();new Thread(() -> {try {System.out.println("Transferring" + " an element");// Transfer a String element// using transfer() methodg.transfer("is a computer" + " science portal.");System.out.println("Element " + "transfer is complete");} catch (InterruptedException e1) {System.out.println(e1);} catch (NullPointerException e2) {System.out.println(e2);}}).start();try {// Get the transferred elementSystem.out.println("Geeks for Geeks "+ g.take());} catch (Exception e) {System.out.println(e);}
- 放入 null的时候,会报空指针
public static void main(String args[]) {LinkedTransferQueue<String> g = new LinkedTransferQueue<String>();new Thread(() -> {try {System.out.println("Transferring" + " an element");// Transfer a null element// using transfer() methodg.transfer(null);System.out.println("Element " + "transfer is complete");} catch (Exception e) {System.out.println(e);System.exit(0);}}).start();try {// Get the transferred elementSystem.out.println("Geeks for Geeks "+ g.take());} catch (Exception e) {System.out.println(e);}}
Transferring an element
java.lang.NullPointerException
tryTransfer
tryTransfer(E e)方法
tryTransfer(E e) 当生产者线程调用tryTransfer方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和transfer方法的区别就是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费后才返回。
tryTransfer(E e, long timeout, TimeUnit unit) 加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回;如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
transfer(E e)方法
transfer方法,用于将指定元素e传递给消费者线程(调用take/poll方法)。如果有消费者线程正在阻塞等待,则调用transfer方法的线程会直接将元素传递给它;
如果没有消费者线程等待获取元素,则调用transfer方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一个消费者线程获取元素。
2.4 SynchronousQueue 同步队列
-
阻塞队列下的没有容量,
-
进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
-
put、take
/*** 同步队列* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素* put了一个元素,必须从里面先take取出来,否则不能在put进去值!*/
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列new Thread(() -> {try {TimeUnit.SECONDS.sleep(6);System.out.println(Thread.currentThread().getName() + " put 1");blockingQueue.put("1");System.out.println(Thread.currentThread().getName() + " put 2");blockingQueue.put("2");} catch (InterruptedException e) {e.printStackTrace();}}, "T1").start();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}, "T2").start();}
}
2.5 ConcurrentLinkedQueue
是一个无界、线程安全且非阻塞的队列。它不是BlockingQueue的实现类。
BlockingQueue的实现类都是阻塞队列。
ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();queue.offer("哈哈哈");System.out.println("从队列中peek:" + queue.peek());System.out.println("offer后,队列是否空?" + queue.isEmpty());System.out.println("从队列中poll:" + queue.poll());System.out.println("从队列中peek:" + queue.peek());System.out.println("pool后,队列是否空?" + queue.isEmpty());
从队列中peek:哈哈哈offer后,队列是否空?false从队列中poll:哈哈哈从队列中peek:nullpool后,队列是否空?true