> 文章列表 > 【阻塞队列和并发修改异常】fail-fast和fail-safe,阻塞队列体系,3组方法,优先阻塞队列,延迟队列,链表无界阻塞,同步队列,非阻塞队列

【阻塞队列和并发修改异常】fail-fast和fail-safe,阻塞队列体系,3组方法,优先阻塞队列,延迟队列,链表无界阻塞,同步队列,非阻塞队列

【阻塞队列和并发修改异常】fail-fast和fail-safe,阻塞队列体系,3组方法,优先阻塞队列,延迟队列,链表无界阻塞,同步队列,非阻塞队列

1. 集合3种错误

迭代器的remove报错

  • 最顶层报错
public interface Iterator<E> {default void remove() {throw new UnsupportedOperationException("remove");}
}
  • 实现类 会报 这个错
public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E> { public void remove() {if (lastRet < 0)throw new IllegalStateException();}private class Itr implements Iterator<E> {int lastRet = -1;//默认为-1//执行第一步public void remove() {if (lastRet < 0)//报错1throw new IllegalStateException();//使用next后,进入这里checkForComodification();}try {AbstractList.this.remove(lastRet);if (lastRet < cursor)cursor--;lastRet = -1;expectedModCount = modCount;} catch (IndexOutOfBoundsException e) {throw new ConcurrentModificationException();}}
}

直接移除报错IllegalStateException

所以list会报 IllegalStateException

            List<Integer> list = Arrays.asList(1, 2, 3);Iterator<Integer> iterator = list.iterator();//直接移除报错:Exception in thread "Thread-0" java.lang.IllegalStateExceptioniterator.remove();//依然是同样的错List<Integer> list = new ArrayList<>();
  • 正常使用的逻辑
        List<Integer> list = new ArrayList<>();list.add(1);Iterator<Integer> iterator = list.iterator();iterator.next();iterator.remove();

Arrays.asList的next后UnsupportedOperationException

  • UnsupportedOperationException
            List<Integer> list = Arrays.asList(1, 2, 3);//固定长度的ArrayListSystem.out.println(iterator.next());//Exception in thread "Thread-0" java.lang.UnsupportedOperationExceptioniterator.remove();
        List<Integer> list = Arrays.asList(1, 2, 3);list.add(333);//Exception in thread "main" java.lang.UnsupportedOperationException//最终调用的源码如下。因为:Arrays的内部类ArrayList,没添加的方法。//会调用父类的 AbstractList,是 扔异常的public void add(int index, E element) {throw new UnsupportedOperationException();}
Arrays.asList(1,2,3) //是一个特殊的 ArrayListpublic static <T> List<T> asList(T... a) {return new ArrayList<>(a);}//核心是 Arrays的内部类private static class ArrayList<E> extends AbstractList<E>implements RandomAccess, java.io.Serializable{ArrayList(E[] array) {a = Objects.requireNonNull(array);}}
        public void remove() {if (lastRet < 0)//报错1throw new IllegalStateException();//使用next后,进入这里。lastRet 为 0checkForComodification();try {//将会执行 remove的错误。即:最顶层的错误AbstractList.this.remove(lastRet);}}//这里并没有 并发异常,不走这个错误final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}

ConcurrentModificationException

        List<Integer> list = new ArrayList();list.add(1);Iterator<Integer> iterator = list.iterator();//原来只有一个元素。hashNext第二次会返回 false//现在 hashNext 第二次依然返回truewhile (iterator.hasNext()) {//第二次 进入 next 方法。进入检查的方法,就会报错://Exception in thread "main" java.util.ConcurrentModificationExceptioniterator.next();//因为 这里加入了一个元素。list.add(4);}
        public E next() {checkForComodification();}final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}

CopyOnWriteArrayList 最新的添加不显示

        //底层是 cas,会复制一份//    private transient volatile Object[] array;List<Integer> list = new CopyOnWriteArrayList<>();list.add(1);Iterator<Integer> iterator = list.iterator();//第二次循环,hasNext 会返回 falsewhile (iterator.hasNext()) {iterator.next();list.add(4);}

fail-fast和fail-safe

迭代器:Java Collection 定义了两种类型的迭代器:fail-fast和fail-safe。

​ fail-fast:如果集合在迭代时被修改,则立即抛出 ConcurrentModificationException 。

  • java.util包下的集合类都是fail-fast,不能在多线程下发生并发修改(迭代过程中被修改)。
  • 它们都在迭代器的实现中声明了一个transient int modCount,通过比较该值判断集合是否被修改。
    		protected transient int modCount = 0;int expectedModCount = modCount;final void checkForComodification() {if (modCount != expectedModCount)throw new ConcurrentModificationException();}

fail-safe:如果集合在迭代时被修改,不会抛出 ConcurrentModificationException,

  • 因为它们操作的是集合的克隆,而不是实际的集合。
  • java.util.concurrent包下的集合都是fail-safe,可以在多线程下并发使用,并发修改。

并发修改:简单的说,并发修改在一个JVM内是多个线程同时修改同一个对象的过程。

2. 阻塞队列BlockingQueue

2.1 核心的3组方法

add remove offer poll加时间 put take

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用

抛出异常

当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException: Queue full

当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException

特殊值

插入方法,成功ture失败false

移除方法,成功返回出队列的元素,队列里面没有就返回null

一直阻塞

当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。

当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。

        BlockingQueue<String> b = new ArrayBlockingQueue(2);System.out.println(b.add("1"));System.out.println(b.add("2"));//llegalStateException: Queue full//System.out.println(b.add("3"));//在开头的为:1System.out.println("在开头的为:" + b.element());//先进先出,先出来 1System.out.println(b.remove());System.out.println(b.remove());//java.util.NoSuchElementExceptionSystem.out.println(b.remove());

2.2 阻塞队列解释

  • 如果队列已满(当队列有界时)或变为空,则队列 阻塞访问线程
    • 如果队列已满,则添加新元素将阻塞访问线程,除非有空间可用于新元素。
    • 同样,如果队列为空,则访问元素会阻塞调用线程。

无界队列:在创建时未指定队列的大小

  • 因此,队列可以随着元素的添加而动态增长。但是,如果没有剩余内存,则会抛出java.lang.OutOfMemoryError。

2.3 体系

ArrayBlockingQueue是有界的 (固定大小)阻塞队列实现,

LinkedBlockingQueue是一个可选的有界阻塞队列实现,

PriorityBlockingQueue是一个可以按特定顺序消费项目,

  • 支持优先级排序的无界阻塞队列。

DelayQueue 队列实现的延迟无界阻塞队列

LinkedTransferQueue提供了实现一种背压形式的能力。

  • 由链表结构组成的无界阻塞队列

SynchronousQueue提供在线程之间交换数据的简单方法

  • 只有一个不取走,不生产第二个。

  • Collection

    • Queue 队列
      • BlockingQueue 阻塞队列接口
        • LinkedTransferQueue 由链表结构组成的无界阻塞队列。
        • BlockingDeque
        • LinkedBlockingDeque 由链表结构组成的双向阻塞队列
        • Priority BlockingQueue 支持优先级排序的无界阻塞队列。
        • *** SynchronousQueue**
          • 不存储元素的阻塞队列,也即单个元素的队列。
            • 只有一个不取走,不生产第二个。
        • DelayQueue 使用优先级队列实现的延迟无界阻塞队列。
        • *** Array** BlockingQueue
          • 由数组结构组成的有界阻塞队列。
        • *** Linked** BlockingQueue
          • 由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
block
n.
大块,一块(木料、石头等);(四面临街的)街段,
v.
阻塞,堵塞(道路、管道等);遮住(视线)priority
英
/praɪˈɒrəti/
n.
优先事项,最重要的事;优先,优先权,重点;<英>优先通行权
adj.
优先的synchron ized
synchronous
英
/ˈsɪŋkrənəs/
adj.
同步的;同时的offer
英
/ˈɒfə(r)/
v.
提供,给予;提议,表示愿意(做某事);出(价),开(价);提出,作出;表示(爱、友谊等);
n.
主动提议,提供; 出价,报价; (商品的)特价,特惠;求婚poll
n.
民意调查,民意测验;选举投票,计票;投票数;
v.
对……进行民意测验(调查);获得(票数);(电信,计算机)轮询,探询;peek
英
/piːk/
v.
偷看,窥视;微露出,探出
n.
一瞥,偷偷地一看;(计算机)读取数据

优先队列PriorityBlockingQueue

  • 总之会根据对象的 PriorityBlockingQueue决定权重
public class PriorityBlockingQueueTest {public static void main(String[] args) {//长度10的 排队阻塞队列PriorityBlockingQueue<Patient> pbq = new PriorityBlockingQueue<>(10);//创建3个 年轻人,放入for (int i = 0; i < 3; i++) {Patient patent = new Patient("Patent" + i, 20 + i);pbq.offer(patent);}//创建一个 年老的人Patient oldMan = new Patient("OldMan", 88);pbq.offer(oldMan);Patient patient = null;do {//逐步弹出patient = pbq.poll();if (patient != null) {System.out.println(patient.name + "挂号成功!");}} while (patient != null);}static class Patient implements Comparable<Patient> {private String name;private Integer age;private long waitingTime;public Patient(String name, Integer age) {this.name = name;this.age = age;this.waitingTime = System.nanoTime();}@Overridepublic int compareTo(Patient o) {// 80岁和以上,返回 -1,权限更高if (age >= 80) {return -1;} /*elseif (o.age >= 80) {return 1;}*///谁先来,谁先执行return waitingTime < o.waitingTime ? -1 : 1;}}
}

DelayQueue(延时队列)

  • https://blog.csdn.net/c15158032319/article/details/118636233

DelayQueue 是一个通过PriorityBlockingQueue实现延迟获取元素的无界队列 无界阻塞队列

  • 其中添加进该队列的元素必须实现Delayed接口(指定延迟时间),
  • 而且只有在延迟期满后才能从中提取元素

DelayQueue可以运用在以下应用场景:

  1. 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  2. 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

案例

  • 必然是 延迟3秒,5秒,8秒 这样执行
delayTime=3, expire=1680947698428, data=第二次添加任务
delayTime=5, expire=1680947700428, data=第三次添加任务
delayTime=8, expire=1680947703428, data=第一次添加任务
public class DelayQueueDemo {static BlockingQueue<Delayed> queue = new DelayQueue();public static void main(String[] args) throws InterruptedException {queue.add(new MyDelay(8, "第一次添加任务"));queue.add(new MyDelay(3, "第二次添加任务"));queue.add(new MyDelay(5, "第三次添加任务"));while (!queue.isEmpty()) {Delayed delayed = queue.take();System.out.println(delayed);}}
}class MyDelay<T> implements Delayed {long delayTime; // 延迟时间long expire; // 过期时间T data;public MyDelay(long delayTime, T t) {this.delayTime = delayTime;// 过期时间 = 当前时间 + 延迟时间this.expire = System.currentTimeMillis() + delayTime * 1000;data = t;}/*** 剩余时间 = 到期时间 - 当前时间*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}/*** 优先级规则:两个任务比较,时间短的优先执行*/@Overridepublic int compareTo(Delayed o) {long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);return (int) f;}@Overridepublic String toString() {return "delayTime=" + delayTime +", expire=" + expire +", data=" + data;}
}

LinkedTransferQueue 链表无界阻塞

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • 可以算是 LinkedBolckingQueue 和 SynchronousQueue 和合体。
  • LinkedTransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部节点分为数据结点、请求结点;基于CAS无锁算法实现
transfer
v.
(使)转移,搬迁;转移(感情),传染(疾病),转让(权力等);(使)调动,转职;转会
n.
转移,转让,调动;(运动员)转会;转换,过渡;已调动的人,已转移的东西;

案例

LinkedTransferQueue<String> g = new LinkedTransferQueue<String>();new Thread(() -> {try {System.out.println("Transferring" + " an element");// Transfer a String element// using transfer() methodg.transfer("is a computer" + " science portal.");System.out.println("Element " + "transfer is complete");} catch (InterruptedException e1) {System.out.println(e1);} catch (NullPointerException e2) {System.out.println(e2);}}).start();try {// Get the transferred elementSystem.out.println("Geeks for Geeks "+ g.take());} catch (Exception e) {System.out.println(e);}
  • 放入 null的时候,会报空指针
    public static void main(String args[]) {LinkedTransferQueue<String> g = new LinkedTransferQueue<String>();new Thread(() -> {try {System.out.println("Transferring" + " an element");// Transfer a null element// using transfer() methodg.transfer(null);System.out.println("Element " + "transfer is complete");} catch (Exception e) {System.out.println(e);System.exit(0);}}).start();try {// Get the transferred elementSystem.out.println("Geeks for Geeks "+ g.take());} catch (Exception e) {System.out.println(e);}}
Transferring an element
java.lang.NullPointerException

tryTransfer

tryTransfer(E e)方法

tryTransfer(E e) 当生产者线程调用tryTransfer方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和transfer方法的区别就是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费后才返回。

tryTransfer(E e, long timeout, TimeUnit unit) 加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回;如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

transfer(E e)方法

transfer方法,用于将指定元素e传递给消费者线程(调用take/poll方法)。如果有消费者线程正在阻塞等待,则调用transfer方法的线程会直接将元素传递给它;

如果没有消费者线程等待获取元素,则调用transfer方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一个消费者线程获取元素。

2.4 SynchronousQueue 同步队列

  • 阻塞队列下的没有容量,

  • 进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

  • put、take

/*** 同步队列* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素* put了一个元素,必须从里面先take取出来,否则不能在put进去值!*/
public class SynchronousQueueDemo {public static void main(String[] args) {BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); // 同步队列new Thread(() -> {try {TimeUnit.SECONDS.sleep(6);System.out.println(Thread.currentThread().getName() + " put 1");blockingQueue.put("1");System.out.println(Thread.currentThread().getName() + " put 2");blockingQueue.put("2");} catch (InterruptedException e) {e.printStackTrace();}}, "T1").start();new Thread(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}, "T2").start();}
}

2.5 ConcurrentLinkedQueue

是一个无界、线程安全且非阻塞的队列。它不是BlockingQueue的实现类。

BlockingQueue的实现类都是阻塞队列。

        ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue();queue.offer("哈哈哈");System.out.println("从队列中peek:" + queue.peek());System.out.println("offer后,队列是否空?" + queue.isEmpty());System.out.println("从队列中poll:" + queue.poll());System.out.println("从队列中peek:" + queue.peek());System.out.println("pool后,队列是否空?" + queue.isEmpty());
从队列中peek:哈哈哈offer后,队列是否空?false从队列中poll:哈哈哈从队列中peek:nullpool后,队列是否空?true