> 文章列表 > Java并发篇二

Java并发篇二

Java并发篇二

ForkJoin 

在JDK1.7,并行执行任务,提高效率,大数据量才会使用

特点:大任务拆分成小任务,工作窃取,里面维护的是双端队列

package com.kuang.forkjoin;import java.util.concurrent.RecursiveTask;/*** 如何使用forkjoin* 1.forkjoinpool通过它来执行* 2.计算任务forkjoinpool.execute(ForkJoinTask task)* 3.计算类要继承ForkJoinTask*/
public class ForkJoinDemo extends RecursiveTask<Long> {private Long start;private Long end;private Long temp = 10000L;//临界值public ForkJoinDemo(Long start, Long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {if (end - start < temp) {Long sum = 0L;for (Long i = start; i <= end; i++) {sum += i;}return sum;} else {//forkjoin递归long middle = (start + end) / 2;ForkJoinDemo task1 = new ForkJoinDemo(start, middle);task1.fork();//拆分任务,把任务压入线程队列ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);task2.fork();//拆分任务,把任务压入线程队列return task1.join()+task2.join();}}
}

求和计算优化

package com.kuang.forkjoin;import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;public class Test {public static void main(String[] args) throws ExecutionException, InterruptedException {test1();//5190//test2();//3618//test3();//162}//初级程序员public static void test1() {long begin = System.currentTimeMillis();Long sum = 0L;//为什么使用long反而更快些for (Long i = 1L; i <= 10_0000_0000L; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("sum=" + sum + "消耗时间" + (end - begin));}//中级程序员ForkJoinpublic static void test2() throws ExecutionException, InterruptedException {long begin = System.currentTimeMillis();ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L, 10_0000_0000L);ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);//异步提交有返回值Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("sum=" + sum + "消耗时间" + (end - begin));}//高级程序员并行流(]左开右闭public static void test3() {long begin = System.currentTimeMillis();long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0L, Long::sum);long end = System.currentTimeMillis();System.out.println("sum=" + sum + "消耗时间" + (end - begin));}}

异步回调

package com.kuang.future;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 异步调用:CompletableFuture* // 异步执行* // 成功回调* // 失败回调*/
public class Demo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 没有返回值的异步回调 runAsync/*CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"=>void");});System.out.println("1111");completableFuture.get();//获取阻塞执行结果*/// 有返回值的异步回调 supplyAsync// ajax 成功和失败的回调// 返回错误信息CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{//供给型接口 有返回值无参数System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");int i=10/0;return 1024;});System.out.println(completableFuture.whenComplete((t, u) -> {//消费型接口 有参数无返回值System.out.println("t=>"+t);//正常返回结果 t=>nullSystem.out.println("u=>"+u);//错误信息:u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero}).exceptionally((e) -> {//函数型接口 有参数有返回值System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zeroreturn 111;//可以获取错误的返回结果}).get());}
}

JMM

volatile关键字是JVM提供轻量级的同步机制

JMM是java内存模型,不存在东西,概念规定

关于JMM的一些约定

线程解锁前,必须把共享变量立即刷回主存

线程加锁前,必须读取主存中最新值到工作内存中

加锁和解锁是同一把锁

8种操作(assign赋值)

 write和store反了

 

Volatile

特点:保证可见性、不保证原子、禁止指令重排

保证可见性示例代码

package com.kuang.tvolatile;import java.util.concurrent.TimeUnit;public class JMMDemo {// 不加volatile会死循环private volatile static int num=0;public static void main(String[] args) {new Thread(()->{while (num==0){}}).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}num=1;System.out.println(num);}
}

原子性:不可分割

线程A在执行任务的时候不能被打扰也不能被分割,要么同时成功,要么同时失败 

不保证原子性示例代码: 

package com.kuang.tvolatile;public class VDemo02 {private volatile static int num = 0;public static void add() {num++;}public static void main(String[] args) {for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + " " + num);}
}

如果不加synchronized和lock,如何保证原子性,使用原子类解决原子性问题

 这些类的底层都直接和操作系统挂钩,在内存中修改值,Unsafe特殊的类

package com.kuang.tvolatile;import java.util.concurrent.atomic.AtomicInteger;public class VDemo02 {private volatile static AtomicInteger num=new AtomicInteger();public static void add() {//num++;//不是一个原子性操作num.getAndIncrement();}public static void main(String[] args) {for (int i = 0; i < 20; i++) {new Thread(() -> {for (int j = 0; j < 1000; j++) {add();}}).start();}while (Thread.activeCount() > 2) {Thread.yield();}System.out.println(Thread.currentThread().getName() + " " + num);}
}

禁止指令重排 

指令重排:你写的程序,计算机并不是按照你写的那样去执行的

处理器在执行重排的时候需要考数据之间的依赖性

源代码-->编译器优化的重排-->指令并行也可能重排-->内存系统也会重排-->执行

volatile可以避免指令重排(由于内存屏障)

内存屏障就想象成我们的CPU指令,它有两个作用

可以保证特定的操作的执行顺序

可以保证某些变量的内存可见性(利用这些特性,volatile实现了可见性)

彻底玩转单例模式

package com.kuang.single;
// 饿汉式 (不想用也会把这些东西创建出来,造成浪费空间)
public class HungryMan {// 可能会浪费空间private byte[] data1=new byte[1024*1024];private byte[] data2=new byte[1024*1024];private byte[] data3=new byte[1024*1024];// 无参构造private HungryMan(){}private final static HungryMan hungryMan=new HungryMan();public HungryMan getHungryMan(){return hungryMan;}}
package com.kuang.single;
// 懒汉式
public class LazyMan {private LazyMan(){System.out.println(Thread.currentThread().getName()+"ok");}private volatile static LazyMan lazyMan;//双重检测锁模式的 懒汉式单例   DCL懒汉式public static LazyMan getInstance(){// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)if (lazyMan==null){// 锁class代表只锁一个synchronized(LazyMan.class){if (lazyMan==null){return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性/*** 分配内存空间* 执行构造方法,初始化对象* 把这个对象指向这个空间** 期望123* 132 线程A是允许的* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造* 为了避免指令重排,需要在该对象上加上关键字volatile*/}}}return lazyMan;}public static void main(String[] args) {// 多线程并发 就有问题了(解决方案:加锁)for (int i = 0; i < 10; i++) {new Thread(()->{LazyMan.getInstance();}).start();}}
}

这里双重检测加锁是保证了操作原子性,只有一个线程能创建一个实例,其他线程无法创建第二个

volatile关键字是为了防止因为指令重排导致的多线程问题,有可能线程A创建一个实例,虚拟机只执行了分配空间,对象地址引用这两步,这时线程B过来发现对象已经被创建了,但是获取到的对象是还没有被初始化的

反射破坏单例一: 一个反射一个类的方法创建

// 反射破坏单例public static void main(String[] args) throws Exception {LazyMan instance = LazyMan.getInstance();Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);               LazyMan instance2 = declaredConstructor.newInstance();System.out.println(instance);System.out.println(instance2);}

 

 解决破坏单例的方法添加异常 添加之后如果要破坏单例的话两个都使用反射创建就行了

package com.kuang.single;import java.lang.reflect.Constructor;// 懒汉式
public class LazyMan {private LazyMan(){synchronized (LazyMan.class){if (lazyMan!=null)throw new RuntimeException("不要试图使用反射破坏异常");}}private volatile static LazyMan lazyMan;//双重检测锁模式的 懒汉式单例   DCL懒汉式public static LazyMan getInstance(){// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)if (lazyMan==null){// 锁class代表只锁一个synchronized(LazyMan.class){if (lazyMan==null){return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性/*** 分配内存空间* 执行构造方法,初始化对象* 把这个对象指向这个空间** 期望123* 132 线程A是允许的* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造* 为了避免指令重排,需要在该对象上加上关键字volatile*/}}}return lazyMan;}public static void main(String[] args) throws Exception {LazyMan instance = LazyMan.getInstance();//如果把这行注释,下面2行注释放开就又破坏单例了Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);//LazyMan instance1 = declaredConstructor.newInstance();LazyMan instance2 = declaredConstructor.newInstance();System.out.println(instance);//System.out.println(instance1);System.out.println(instance2);}
}

反射破坏单例二:添加标志位需要重新设置就可以破坏了

解决破坏单例二:添加标志位即可不要修改就行

package com.kuang.single;import java.lang.reflect.Constructor;
import java.lang.reflect.Field;// 懒汉式
public class LazyMan {private static boolean flag=false;private LazyMan(){synchronized (LazyMan.class){if (flag==false){//第一次执行无论通不通过反射,都会变为trueflag=true;}else {throw new RuntimeException("不要试图使用反射破坏异常");}}}private volatile static LazyMan lazyMan;//双重检测锁模式的 懒汉式单例   DCL懒汉式public static LazyMan getInstance(){// 加锁(加锁之前,可能会被2个线程或以上拿到,所以需要两次判断)if (lazyMan==null){// 锁class代表只锁一个synchronized(LazyMan.class){if (lazyMan==null){return lazyMan=new LazyMan();//加锁之后极端情况也会出现问题 赋值不保证原子性/*** 分配内存空间* 执行构造方法,初始化对象* 把这个对象指向这个空间** 期望123* 132 线程A是允许的* 线程B由于线程A已经对象指向这个空间了,lazyMan不等于null就return lazyMan但是此时这个lazyMan还没有完成构造* 为了避免指令重排,需要在该对象上加上关键字volatile*/}}}return lazyMan;}// 反射破坏单例// 把下面三行注释放开就是破坏单例public static void main(String[] args) throws Exception {//Field flag = LazyMan.class.getDeclaredField("flag");//flag.setAccessible(true);Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);LazyMan instance1 = declaredConstructor.newInstance();//flag.set(instance1,false);LazyMan instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

package com.kuang.single;// 静态内部类
public class Holder {private Holder(){}public static Holder getInstance(){return InnerClass.HOLDER;}public static class InnerClass{private static final Holder HOLDER=new Holder();}
}

不能使用反射破坏枚举

package com.kuang.single;import java.lang.reflect.Constructor;public enum EnumSingle {INSTANCE;public EnumSingle getInstance(){return INSTANCE;}
}class Test{public static void main(String[] args) throws Exception {EnumSingle instance1= EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

 

 反编译javap -p EnumSingle.class发现它是有参构造而不是无参构造

package com.kuang.single;import java.lang.reflect.Constructor;public enum EnumSingle {INSTANCE;public EnumSingle getInstance(){return INSTANCE;}
}class Test{public static void main(String[] args) throws Exception {EnumSingle instance1= EnumSingle.INSTANCE;Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(String.class,int.class);declaredConstructor.setAccessible(true);EnumSingle instance2 = declaredConstructor.newInstance();System.out.println(instance1);System.out.println(instance2);}
}

 

深入理解CAS(比较并交换)

当前工作内存中的值与主内存值进行比较,如果这个值是期望的,那么则执行操作,如果不是就一直循环

缺点:循环会耗时、一次性只能保证一个共享变量的原子性、引发ABA问题

package com.kuang.cas;import java.util.concurrent.atomic.AtomicInteger;
/*** CAS 比较并交换*/
public class CASDemo {public static void main(String[] args) {AtomicInteger atomicInteger=new AtomicInteger(2020);System.out.println(atomicInteger.compareAndSet(2020, 2021));System.out.println(atomicInteger);atomicInteger.getAndIncrement();System.out.println(atomicInteger.compareAndSet(2020, 2021));System.out.println(atomicInteger);}
}

自旋锁 

var1当前对象+var2偏移量如果是var5的话就执行操作var5+var4 

原子引用解决ABA问题

package com.kuang.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;/*** 解决ABA问题,引入原子引用*/
public class CASDemo {public static void main(String[] args) {//如果泛型是一个包装类,注意对象引用问题AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<>(1,1);new Thread(()->{int stamp=atomicStampedReference.getStamp();System.out.println("a1=>"+stamp);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}atomicStampedReference.compareAndSet(1,2,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);System.out.println("a2=>"+atomicStampedReference.getStamp());atomicStampedReference.compareAndSet(2,1,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);System.out.println("a3=>"+atomicStampedReference.getStamp());},"a").start();new Thread(()->{int stamp=atomicStampedReference.getStamp();System.out.println("b1=>"+stamp);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}atomicStampedReference.compareAndSet(1,6,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);System.out.println("b2=>"+atomicStampedReference.getStamp());},"b").start();}
}

 

带版本号的原子操作:Integer使用了对象缓存机制,默认范围是-128~127,推荐使用静态工厂方法valueOf获取对象实例,而不是new,因为valueOf使用缓存,而new一定会创建新的对象分配新的内存空间

各种锁的理解 

可重入锁(递归锁)

synchronized锁

package com.kuang.lock;public class Demo01 {public static void main(String[] args) {Phone phone=new Phone();new Thread(()->{phone.sendMessage();},"A").start();new Thread(()->{phone.sendMessage();},"B").start();}
}
class Phone{public synchronized void sendMessage(){System.out.println(Thread.currentThread().getName()+"=>"+"发短信");call();//这里可以看做也有锁}public synchronized void call(){System.out.println(Thread.currentThread().getName()+"=>"+"打电话");}
}

Lock版

package com.kuang.lock;import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class Demo02 {public static void main(String[] args) {Phone2 phone=new Phone2();new Thread(()->{phone.sendMessage();},"A").start();new Thread(()->{phone.sendMessage();},"B").start();}
}
class Phone2{Lock lock=new ReentrantLock();public void sendMessage(){lock.lock();try {System.out.println(Thread.currentThread().getName()+"=>"+"发短信");call();} catch (Exception e) {e.printStackTrace();}finally {lock.unlock();}}public void call(){// 细节问题:锁要配对lock.lock();try {System.out.println(Thread.currentThread().getName()+"=>"+"打电话");} catch (Exception e) {e.printStackTrace();}finally {lock.unlock();}}
}

自旋锁

package com.kuang.lock;import java.util.concurrent.atomic.AtomicReference;public class SpinLockDemo {AtomicReference<Thread> atomicReference=new AtomicReference<>();public void myLock(){Thread thread = Thread.currentThread();System.out.println(Thread.currentThread().getName()+"=>myLock");while (!atomicReference.compareAndSet(null, thread)) {}}public void myUnlock(){Thread thread = Thread.currentThread();System.out.println(Thread.currentThread().getName()+"=>myUnLock");atomicReference.compareAndSet(thread,null);}
}
package com.kuang.lock;import java.util.concurrent.TimeUnit;public class SpinLockTest {public static void main(String[] args) throws InterruptedException {SpinLockDemo lock = new SpinLockDemo();new Thread(()->{lock.myLock();try {TimeUnit.SECONDS.sleep(3);} catch (Exception e) {e.printStackTrace();}finally {lock.myUnlock();}},"A").start();new Thread(()->{lock.myLock();try {} catch (Exception e) {e.printStackTrace();}finally {lock.myUnlock();}},"B").start();}
}

死锁排查 

package com.kuang.lock;import java.util.concurrent.TimeUnit;public class DeadLockDemo {public static void main(String[] args) {String lockA="lockA";String lockB="lockB";new Thread(new MyThread(lockA,lockB),"lockA").start();new Thread(new MyThread(lockB,lockA),"lockB").start();}
}class MyThread implements Runnable{private String lockA;private String lockB;public MyThread(String lockA, String lockB) {this.lockA = lockA;this.lockB = lockB;}@Overridepublic void run() {synchronized (lockA){System.out.println(Thread.currentThread().getName()+"lock:"+lockA+"get" +lockB);try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lockB){System.out.println(Thread.currentThread().getName()+"lock:"+lockB+"get" +lockA);}}}
}

使用jps -l定位进程号 接着使用jstack 进程号找到死锁问题 

 

 面试说排查死锁:日志+堆栈