> 文章列表 > Spark结合多线程批量执行任务

Spark结合多线程批量执行任务

Spark结合多线程批量执行任务

一、目的

  1. 避免资源的浪费

  2. 提高任务执行的效率

  3. 防止任务未执行完毕,session和线程池已关闭,导致任务失败

二、异常coding

  1. method 1
for (String s : list) {Dataset<Row> sql = sparkSession.sql(s);sql.show();
}
sparkSession.close();
System.out.println("=====任务执行完毕====");
  1. method 2
//启动多线程
ExecutorService executorService = Executors.newFixedThreadPool(list.size());for (String s : list) {executorService.submit(new Runnable() {@Overridepublic void run() {Dataset<Row> sql = sparkSession.sql(s);sql.show();}});
}
//关闭session和线程池
executorService.shutdown();
sparkSession.close();
System.out.println("=====任务执行完毕====");

上述两段代码有很大的问题,以及会有异常产生

  • method 1:

    任务是一个一个串行执行,例如:

    若该任务申请到10个core,10G内存,而在执行第一个sql1时,job只使用了4个Core和2G内存,那么就造成了资源的浪费和剩下的任务还在等待该job的执行,不能做到资源的合理使用,而且任务时串行执行效率慢。

    弊端:a.浪费资源 b.执行效率慢

  • method2:

    虽然是使用多线程提交任务,但是会发生任务未执行完毕session提前关闭的异常

三、coding–解决方案

  1. 使用CountDownLatch的计数器批量提交任务

    1. 计数器的初始大小和任务数量保持一致(和线程数无关)

    2. 每执行完一次任务计数器减一

    3. await()方法会一致阻塞,直到计数器的值减为0,才会释放锁,以便所有任务执行完毕后继续执行下一步操作

/*** 批量执行sql任务*/
public class Test {public static void main(String[] args) throws Exception {//创建批量sql任务String sql1 = "select count(1) from pub_penalty where dt=20210106";String sql2 = "select count(1) from pub_penalty_tmp";String sql3 = "select count(1) from pub_permission_tmp";String sql4 = "select count(1) from pub_permission";String sql5 = "select count(1) from test_sort";ArrayList<String> list = new ArrayList<>();list.add(sql1);list.add(sql2);list.add(sql3);list.add(sql4);list.add(sql5);//初始化SparkSessionSparkSession sparkSession = initSparkSession();//初始化CountDownLatch计数器,计数器大小和任务数保持一致CountDownLatch countDownLatch = new CountDownLatch(list.size());//启动多线程ExecutorService executorService = Executors.newFixedThreadPool(list.size());for (String s : list) {executorService.submit(new Runnable() {@Overridepublic void run() {Dataset<Row> sql = sparkSession.sql(s);System.out.println(s + "---->runing..........." + sql.count());//计数器减一countDownLatch.countDown();}});}//阻塞等待countDownLatch.await();System.out.println("----->执行完毕");//关闭session和线程池sparkSession.close();executorService.shutdown();
}private static SparkSession initSparkSession() {System.setProperty("hadoop.home.dir", "D:\\\\appinstall");System.setProperty("HADOOP_USER_NAME", "bbdoffline");SparkConf conf = new SparkConf();conf.setAppName("bbd-wgj");SparkSession sparkSession = SparkSession.builder().config(conf).master("local[*]").enableHiveSupport().getOrCreate();sparkSession.sparkContext().setLogLevel("WARN");return sparkSession;
}
  1. 使用Callable提交任务,通过返回值Future的阻塞方法get()批量提交任务

    1. 使用Callable具有返回值的多线程方法提交任务

    2. 返回值Future 的get()方法是个阻塞方法,会等待结果的返回,直到任务结束

......主体代码......
//启动多线程
ExecutorService executorService = Executors.newFixedThreadPool(list.size());
ArrayList<Future<String>> list1 = new ArrayList<>();
for (String s : list) {//使用Callable具有返回值的多线程方法提交任务Future<String> submit = executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {Dataset<Row> sql = sparkSession.sql(s);sql.show();return "success" + s;}});//将任务返回值添加到list集合list1.add(submit);
}for (Future<String> result : list1) {try {//get是一个阻塞方法,获取结果值String retult = result.get();System.out.println(retult);} catch (Exception e) {e.printStackTrace();}
}//关闭session和线程池
executorService.shutdown();
sparkSession.close();
System.out.println("=====任务执行完毕====");

注意线程池中 submit方法 和 executor方法的区别

多线程提交任务时,当资源申请足够多时,会同时执行!!!即使资源不足,也会在上一个任务结束释放资源后立即执行

注意:任务中视图表的创建、临时表的创建、等共享变量的创建,多个任务同时执行时,会造成同时使用!!!