> 文章列表 > Java多线程案例-Java多线程(3)

Java多线程案例-Java多线程(3)

Java多线程案例-Java多线程(3)

目录

单例模式

        饿汉模式

        懒汉模式

                单线程

                 多线程版

阻塞队列

        什么是阻塞队列?

        标准库中的阻塞队列

        阻塞队列实现

定时器

        标准库中的定时器

        定时器的实现

        完整代码


单例模式

        单例模式是常见的设计模式之一, 那什么是设计模式呢?

设计模式 : 设计模式好比象棋中的 "棋谱". 红方当头炮, 黑方马来跳. 针对红方的一些走法, 黑方应招的时候有 一些固定的套路. 按照套路来走局势就不会吃亏. 软件开发中也有很多常见的 "问题场景". 针对这些问题场景, 大佬们总结出了一些固定的套路. 按照 这个套路来实现代码, 也不会吃亏.

        单例模式可以保证某个类在程序中只存在唯一一份实例对象.

单例模式的具体实现分为"饿汉" 和 "懒汉" 两种

        饿汉模式

        也就是在加载类的时候, 就生成一个类的实例

class Singleton {// 唯一实例private static Singleton instance = new Singleton();// 禁止对外new 实例对象private Singleton() {};// 获取实例对象的方法public static Singleton getInstance() {return instance;}
}

Singleton类中的instance被static修饰, 也就是说, 这个instance目前归Singleton所有, 而不是单个Singleton的对象,instance属于Singleton这个类, 而不属于任何类对象.

这里将无参构造器使用private的方法给禁止调用, 也就无法调用无参构造方法来进行创建多个实例:

 此处, 在类的内部就将实例创建好了, 同时禁止外部创建实例, 这样就可以保证单例的特性了. 

例如: 

public class Main {public static void main(String[] args) {Singleton test1 = Singleton.getInstance();Singleton test2 = Singleton.getInstance();System.out.println(test2 == test1);}
}

这个静态字段instance在这个Singleton类加载的时候就已经生成好了.

此处的return instance仅仅只是读取操作, 还没有涉及到修改操作, 所以饿汉模式总是线程安全的

        懒汉模式

                单线程版

class Singleton {private static Singleton instance = null;private Singleton() {};public static Singleton getInstance() {if (instance == null) {instance = new Singleton();}return instance;}
}

        懒汉模式只有在调用的时候才会新建对象. 这里同样使用private修饰无参构造方法来进制new实例对象, 同时提供getInstance方法来获取这个类的唯一实例, 此处的instance是默认null的, 只有在调用getInstance的时候才会创建唯一实例.

                 多线程版

        但如果是多个线程一起调用, 这种情况下他真的也只会生成一个对象吗, 对比于饿汉模式, 饿汉模式值存在return instance 的读取操作, 他的唯一实例是在创建这个类的时候就已经生成好了, 并不需要写入操作, 而我们的懒汉模式, 存在读: instance == null 和 写: instance = new Singleton()的操作. 也就是说, 懒汉模式是线程不安全的, 在多线程下, 就有可能会new出多个对象来.

        但是也有人会问, 不就是new处一个对象, 这个影响应该不大呀, 不能这样想, 假设我们的实例对象如果有100G的大小, 或者是1T呢, 线程1多new出一个, 线程2多new出一个1T, 那么就是妥妥的消耗计算机内存资源.

        那么如何解决线程安全呢? 无非就是给这些操作进行加锁, 然后给instance字段加上volatile关键字:

class Singleton {private static volatile Singleton instance = null;private Singleton() {};public static Singleton getInstance() {if (instance == null){synchronized (Singleton.class) {if (instance == null) {instance = new Singleton();}}}return instance;}
}

为什么这里面的instance要判断两次?

        这两个if判断看似一样, 但是实际上他们两个执行的动机差别很大, if中间间隔了一个synchronized, 但是加锁会导致锁竞争从而会让线程阻塞等待

        例如: 当线程1 和线程2 都执行到了第一个if判断语句的时候, 这个时候, 两个线程都读取到了instance == null 为true, 于是就都开始往下执行, 此时遇到sychronized就会发生锁竞争, 此时假设线程1 先拿到锁, 于是线程1就将这个唯一实例给new出来了, 然后解锁, 随后线程2又拿到了这个锁, 但是由于之前也是读取到的instance == null为true, 如果没有遇到第二个if判断instance是否为空, 那么就会直接new对象, 生成两个实例对象, 也就不满足线程安全了.

        这样做的好处是, 加锁和解锁其实是一个开销比较高的事情, 而这种懒汉模式的线程不安全也只是发生在首次创建唯一实例的时候, 后续就不需要继续加锁了.

为什么要加valotile?

        进制指令重排序, 保证字段instance 的内存可见性. 例如在线程1修改了instance之后, 需要立马对通知其他线程, instance已经被修改, 此时就不需要去继续锁竞争.

阻塞队列

        什么是阻塞队列?

        阻塞队列是一种特殊的队列, 他也是先进先出, 但是他在这个先进先出的基础上增加了阻塞等待的功能, 它具有如下的特性:

  1. 当队列为空的时候就会阻塞等待, 直到有新的元素入列, 才会结束阻塞
  2. 当队列为满的时候就会阻塞等待, 直到有元素出列, 才会结束阻塞

这种阻塞原理其实是一个典型的生产消费模型, 也就是通过一个容器来解决两个生产者和消费者之间的强耦合关系

如下:  消费者发出100个请求, 但是生产者就只能同时处理一个请求.

 现在如果消费者有1000个请求, 但是生产者因为请求量过大而'挂掉了, 这就会直接影响到生产者, 导致生产者也'挂了',  两边的相关性太大, 一个被影响, 能直接影响到另外一个线程, 这就叫做高耦合.

但是我们在他两之间加上一个阻塞队列呢, 让生产者能够按顺序, 一个一个的去处理消费者的请求, 那事情不就得到了解决:

         这个阻塞队列就相当于一个缓冲区, 平衡了生产者和消费者之间的处理差. 就比如在各大电商平台上都会有秒杀的活动, 这个时候 如果短时间内有大量请求, 如果没有这个阻塞队列, 服务器直接对这些请求进行处理, 很可能就因为处理量巨大而到时服务器挂掉了, 而在其中加一个阻塞队列就相当于一个缓冲区, 将这些请求都放入缓冲区让线程来慢慢处理. 这就可以防止服务器突然被一波请求给直接冲垮.

        同时如果消费者这边'挂了', 也不会直接影响到生产者这边, 反过来生产者同样如此.

        标准库中的阻塞队列

        在java的标准库中内置了阻塞队列, 如果有需要就可以直接引用:

BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();

特性:

  1. BlockingQueue是一个接口, 真正的实现类是LinkedBlockingQueue,
  2. put和take方法是阻塞队列的常用方法, 其中put是用于入队列, take是出队列
  3. BlockingQueue也有offer, poll, peek等方法, 但是这些方法都不带则色特性

一个简单的例子:

public class ThreadDemo {public static void main(String[] args) throws InterruptedException {BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>();Thread customer = new Thread(()->{while(true) {int value;try {value = blockingDeque.take();} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("消费元素: " + value);}}, "消费者");customer.start();Thread producer = new Thread(()->{Random random = new Random();while (true) {int num = random.nextInt(100);System.out.println("生产元素: " + num );try {blockingDeque.put(num);Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}}},"生产者");producer.start();customer.join();producer.join();}
}

        阻塞队列实现

        我们说, 阻塞队列其实也是队列, 只不过是在队列的基础上增加了阻塞的功能:

        增加synchronized进行加锁控制, 使用size标记法实现循环队列.

class MyBlockingQueue {private int[] items = new int[1000];volatile private int head = 0;volatile private int tail = 0;volatile private int size = 0;synchronized public void put(int val) throws InterruptedException {if (size == items.length) {// 如果队列满了, 就必须阻塞等待this.wait();}items[tail] = val;tail++;if (tail == items.length) {tail = 0;}size++;this.notify();}synchronized public Integer take() throws InterruptedException {if (size == 0) {this.wait();}int value = items[head];head++;if (head == items.length) {head = 0;}size--;this.notify();return value;}
}

理解:

    public void put(int value) throws InterruptedException {synchronized (this) {// 此处最好使用 while.// 否则 notifyAll 的时候, 该线程从 wait 中被唤醒,// 但是紧接着并未抢占到锁. 当锁被抢占的时候, 可能又已经队列满了// 就只能继续等待while (size == items.length) {wait();}items[tail] = value;tail = (tail + 1) % items.length;size++;notifyAll();}}

        这个put方法, 对立面的内容进行加锁操作, 此时, 每个对象对这个队列进行在put操作的时候都会加锁,  如果队列满了就会进入wait阻塞.   同时, 在放入元素的时候, 解除队列为空的阻塞状态
这里最好是使用while循环来控制wait, 因为在使用notifyAll时回打开所有此所对象的线程, 也就是会被唤醒, 但是这个时候size == items.length ,仍然需要继续等待wait.

定时器

        什么是定时器? 定时器是软件开发中的一个重要组件, 类似于一个闹钟, 达到指定的时间后就会执行某个特定的代码.

        标准库中的定时器

        在java标准库中提供了一个Timer类, 其核心方法为schedule
schedule方法中包含了两个参数, 第一个是指定即将要执行的代码, 第二个参数是, 指定多长时间后,执行.

例如:

        Timer timer = new Timer();timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("hello world");}});

        定时器的实现

定时器的构成:

  • 一个优先级阻塞队列:  阻塞队列中的任务都有各自的执行时刻, 最先执行的任务一定是设定时间最短的(delay最小的), 使用带优先级队列就可以高效的把这个delay最小的任务找出来.
  • 队列中每一个元素是一个TimerTask对象
  • Task中带有一个时间属性, 队首元素就是即将被执行的元素
  • 同时有一个worker线程扫描这个队列队首元素, 看这个队首元素是否需要执行 

(1) Timer类提供的核心接口为schedule, 用于注册一个任务, 然后指定多长时间后执行

public class Timer {public void schedule(Runnable command, long after) {// TODO}
}

(2) Task类用来描述一个任务(作为Timer的内部类) , 里面包含一个Runnable对象和一个time(毫秒时间戳)

static class Task implements Comparable<Task> {private Runnable command;private long time;public Task(Runnable command, long time) {this.command = command;// time 中存的是绝对时间, 超过这个时间的任务就应该被执行this.time = System.currentTimeMillis() + time;}public void run() {command.run();}@Overridepublic int compareTo(Task o) {// 谁的时间小谁排前面return (int)(time - o.time);}}
}

(3) Timer实例中, 核心数据结构为PriorityBlockingQueue(优先级阻塞队列, 提供take和put方法, take获取队首元素), 然后通过schedule来往里面插入数据

public Mythimer() {Thread t= new Thread(()->{while (true) {try {synchronized (locker){MyTask myTask = queue.take();long currentTime = System.currentTimeMillis();if (myTask.time <= currentTime) {// 时间到了, 执行任务myTask.runnable.run();} else {// 时间还没到// 把取出的任务塞回去queue.put(myTask);locker.wait(currentTime - myTask.time);}}} catch (InterruptedException e) {throw new RuntimeException(e);}}});}

当使用MyTimer mytimer = new MyTimer(); 调用这个无参构造方法的时候, 会创建一个线程Thread t来扫描这个PriorityBlockingQueue, 每次都拿出队首元素, 也就是:

MyTask myTask = queue.take();
long currentTime = System.currentTimeMillis();

拿出来之后if判断这个队首元素是否需要被执行, 通过设定的时间与当前的计算机系统时间来比较判断. 如果到了就调用run方法执行, 否则就将这个任务塞回优先级阻塞队列, 然后进入wait阻塞等待, 等待时间为这个被放进去的任务的将要被执行的等待时间.

这里的while(true), 是一个死循环, 他的执行速度非常的块, 也会占用系统资源, 计算机每秒访问这个队列很多次, 但是队首元素仍然在等待时间.

public void schedule(Runnable runnable, long delay) {MyTask myTask = new MyTask(runnable,delay);queue.put(myTask);synchronized (locker) {locker.notify();}}

但是, 如果有其他的将会被更早的执行的任务插入插入队列的话, 那么之前的队首元素就不是最先执行的, 但是现在仍然在wait等待, 也就是现在需要使用notify将其唤醒. 然后重新从优先级阻塞队列里面取到剩余时间最短的任务.

        完整代码

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;class MyTimer  {// 带有优先级的阻塞队列 (核心数据结构)private PriorityBlockingQueue<MyTask> queue = new PriorityBlockingQueue<>(); // 这里的元素是什么呢?// 此处的delay是一个形如3000 这样的数字()public void schedule(Runnable runnable, long delay) {synchronized (locker) {MyTask myTask = new MyTask(runnable,delay);queue.put(myTask);locker.notify();}}// 构造线程执行任务public MyTimer() {Thread t= new Thread(()->{while (true) {try {synchronized (locker){while (queue.isEmpty()) {locker.wait();}MyTask myTask = queue.take();long currentTime = System.currentTimeMillis();if (myTask.time <= currentTime) {// 时间到了, 执行任务myTask.runnable.run();} else {// 时间还没到// 把取出的任务塞回去queue.put(myTask);locker.wait(myTask.time - currentTime);}}} catch (InterruptedException e) {throw new RuntimeException(e);}}});t.start();}// 创建锁对象private Object locker= new Object();}// 创建一个类, 表示两方面的信息
// 1.执行的任务
// 2.任务什么时候执行class MyTask implements Comparable<MyTask>{// Runnable实现类public Runnable runnable;// 什么时间点执行(实际执行时间)public long time;public MyTask(Runnable runnable, long delay) {this.runnable = runnable;this.time = System.currentTimeMillis() + delay;}@Overridepublic int compareTo(MyTask o) {return (int) (this.time - o.time);}
}public class TestDemoMyTimer {public static void main(String[] args) {MyTimer myTimer = new MyTimer();myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello4");}}, 4000);myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello3");}}, 3000);myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello2");}}, 2000);myTimer.schedule(new Runnable() {@Overridepublic void run() {System.out.println("hello1");}}, 1000);System.out.println("hello0");}
}