> 文章列表 > SpringMVC异步请求

SpringMVC异步请求

SpringMVC异步请求

背景

Tomcat等应用服务器的连接线程池实际上是有限制的;每一个连接请求都会耗掉线程池的一个连接数;如果某些耗时很长的操作,如对大量数据的查询操作、调用外部系统提供的服务以及一些 IO 密集型操作等,会占用连接很长时间,这个时候这个连接就无法被释放而被其它请求重用。如果连接占用过多,服务器就很可能无法及时响应每个请求;极端情况下如果将线程池中的所有连接耗尽,服务器将长时间无法向外提供服务
在常规场景中,客户端需要等待服务器处理完毕后返回才能继续进行其它操作,这个场景下每一步都是同步调用,如客户端调用 Servlet 后需要等待其处理返回,Servlet 调用具体的 Controller 后也需要等待其返回。这种情况是在服务器端开发中最常见的场景,适合于服务器端处理时间不是很长的情况;默认情况下 Spring 的 Controller 提供的就是这样的服务
当某项服务处理时间过长时,如邮件发送,需要调用到外部接口,处理时间不受调用方的控制,因此如果耗时过长会有两个比较严重的后果:一是如上文所说的会长时间的占用请求连接数,严重时有可能导致服务器失去响应; 二是客户端等待时间过长,导致前端应用的用户友好性下降,而且客户很有可能因为长时间得不到服务器响应而重复操作,从而加重服务器的负担,应用崩溃的机率变大!
为应对这种场景,一般会启用一个后台的线程池,处理请求的 Controller 会先提交一个耗时长操作如邮件发送到线程池中,然后立即返回到前台。因此处理响应的主线程耗时变短,客户感受到的就是在点击某个发送按钮后很快就得到服务器反馈结果,然后就放心的继续处理其它工作。实际上邮件发送这种事情延迟几秒对于客户来说根本感受不到。当然应用需要保证提交到线程池中的任务执行成功,或者是执行失败后在前端某个地方能够看到失败的具体情况
这种场景在 Spring 中可使用 TaskExecutor 或者是 Async 来处理,关于它们的用法请参考:Spring 基础学习-任务执行(TaskExecutor及Async)
通过以上两种场景,很容易就会想到,如果某个操作既耗时很长,客户端又必须要等待其返回才能进一步处理时,应该通过什么方式来处理?Servlet3.0 中引入异步请求处理来处理这种场景,相应的,Spring在3.2 版本中就引入相关机制来使用Servlet的该特性

SpringMVC 异步处理概述

为满足耗时任务占用应用服务器连接数,而客户端又必须等待这些耗时长任务返回才能处理下一步工作的场景,Spring 引入了以下机制来处理:

使用 Callable 或者 DeferredResult 作为 Controller 的返回值,能够处理异步返回单个结果的场景;使用 ResponseBodyEmitter/SseEmitter 或者StreamingResponseBody 来流式处理多个返回值;在 Controller 中使用响应式客户端调用服务并返回响应式的数据对象

Callable
Callable 直接使用在 Controller 中被 RequestMapping 所注解的方法上,做为其返回对象

使用示例

@RequestMapping("/testCallable")
public Callable < String > testCallable() {logger.info("Controller开始执行!");Callable < String > callable = () - > {Thread.sleep(5000);logger.info("实际工作执行完成!");return "succeed!";};logger.info("Controller执行结束!");return callable;
}

可以看到以下结果:

浏览器等待了大约5秒后返回结果

打印日志中,Controller 在 6ms 就执行结束

打印日志中,实际的任务执行在一个名称为 MvcAsync1 的线程中执行,并且在 Controller 执行完5s后才执行结束

因此可以得到结论:

返回 Callable 对象时,实际工作线程会在后台处理,Controller 无需等待工作线程处理完成,但 Spring 会在工作线程处理完毕后才返回客户端

它的执行流程是这样的

客户端请求服务

SpringMVC 调用 Controller,Controller 返回一个 Callback 对象

SpringMVC 调用 request.startAsync 并且将Callback提交到 TaskExecutor 中去执行

DispatcherServlet 以及 Filters 等从应用服务器线程中结束,但 Response 仍旧是打开状态,也就是说暂时还不返回给客户端

TaskExecutor 调用 Callback 返回一个结果,SpringMVC 将请求发送给应用服务器继续处理

DispatcherServlet 再次被调用并且继续处理 Callback 返回的对象,最终将其返回给客户端

DeferredResult
DeferredResult 使用方式与 Callable 类似,但在返回结果上不一样,它返回的时候实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到 DeferredResult 中去

该类包含以下日常使用相关的特性:

超时配置:通过构造函数可以传入超时时间,单位为毫秒;因为需要等待设置结果后才能继续处理并返回客户端,如果一直等待会导致客户端一直无响应,因此必须有相应的超时机制来避免这个问题;就算不设置这个超时时间,应用服务器或者 Spring 也会有一些默认的超时机制来处理这个问题

结果设置:它的结果存储在一个名称为result的属性中;可以通过调用 setResult 的方法来设置属性;由于这个 DeferredResult 天生就是使用在多线程环境中的,因此对这个result属性的读写是有加锁的

接下来将对DeferredResult的处理流程进行说明,并实现一个较为简单的示例

DeferredResult 处理流程

DeferredResult 的处理过程与Callback类似,不一样的地方在于它的结果不是 DeferredResult 直接返回的,而是由其它线程通过同步的方式设置到该对象中,它的执行过程如下所示:

客户端请求服务

SpringMVC 调用 Controller,Controller 返回一个 DeferredResult 对象

SpringMVC 调用 request.startAsync

DispatcherServlet 以及 Filters 等从应用服务器线程中结束,但 Response 仍旧是打开状态,也就是说暂时还不返回给客户端

某些其它线程将结果设置到 DeferredResult 中,SpringMVC 将请求发送给应用服务器继续处理

DispatcherServlet 再次被调用并且继续处理 DeferredResult 中的结果,最终将其返回给客户端

DeferredResult 使用示例

本示例将在一个 Controller 中添加两个 RequestMapping 注解的方法,其中一个返回的是 DeferredResult 的对象,另外一个设置这个对象的值

注意:每个请求都要返回一个 新的 DeferredResult 对象,不能设置成单例,不能多个方法公用一个 DeferredResult

@RestController
@RequestMapping("/test")
public class TestController {private DeferredResult<String> deferredResult;/*** 返回DeferredResult对象*/@RequestMapping("/getDeferredResult")public DeferredResult <String> testDeferredResult() {deferredResult = new DeferredResult < > (5000 L, "请求失败(超时)"); // 设置 5 秒超时return deferredResult;}/*** 对DeferredResult的结果进行设置*/@RequestMapping("/setDeferredResult")public String setDeferredResult() {deferredResult.setResult("success");return "succeed";}
}

第一步先访问:http://localhost/test/getDeferredResult 此时客户端将会一直等待,5秒后会返回 “请求失败(超时)” 提醒

第二步新开页面访问:http://localhost/test/setDeferredResult 此时第一个页面会返回结果(第一个页面还没有返回超时错误的情况下)

上面只是演示了如何使用 DeferredResult 实现异步方法,我们发现,对于 /getDeferredResult 这个请求,每次都返回了新的 DeferredResult 对象,那么如何确保多个请求的时候,/setDeferredResult 能设值到对应的 DeferredResult?在实际开发中,需要对 DeferredResult 进行封装,一个简单的实现方式是把每次创建的新的 DeferredResult 放到 Queue(队列) 中,队列具有从尾部添加新元素,从头部获取新元素的特性,正好可以确保DeferredResult 的使用顺序,即先创建的先使用,后创建的后使用;参考示例代码如下

DeferredResult 封工具类

public class DeferredResultQueue {// 创建一个线程安全的链式队列private static Queue < DeferredResult > queue = new ConcurrentLinkedDeque < > ();// 添加 DeferredResult 方法, 从尾部添加public static void set(DeferredResult < Object > deferredResult) {queue.add(deferredResult);}// 获取并删除队列第一个 DeferredResult 对象public static DeferredResult < Object > get() {return queue.poll();}
}

所以上面的示例代码可修改为

@RestController
@RequestMapping("/test")
public class TestController {/*** 返回DeferredResult对象*/@RequestMapping("/getDeferredResult")public DeferredResult < String > testDeferredResult() {DeferredResult < String > deferredResult = new DeferredResult < > (5000 L, "请求失败(超时)"); // 设置 5 秒超时DefferredResultQueue.set(deferredResult); // 把 new 出来的 DeferredResult 对象放到工具类队列容器中return deferredResult;}/*** 对DeferredResult的结果进行设置*/@RequestMapping("/setDeferredResult")public String setDeferredResult() {DeferredResult < String > deferredResult = DefferredResultQueue.get(); // 获取到对应顺序的 DeferredResult 对象deferredResult.setResult("success");return "succeed";}
}

SseEmitter
Callback 和 DeferredResult 用于设置单个结果,如果有多个结果需要返回给客户端时,可以使用 SseEmitter 以及 ResponseBodyEmitter 等;

下面直接看示例,与 DeferredResult 的示例类似:

@RestController
@RequestMapping("/test")
public class TestController {private static final Logger logger = LoggerFactory.getLogger(TestController.class);private SseEmitter sseEmitter;/*** 返回SseEmitter对象*/@RequestMapping("/testSseEmitter")public SseEmitter testSseEmitter() {sseEmitter = new SseEmitter(10000 L); // 设置 10 秒超时return sseEmitter;}/*** 向SseEmitter对象发送数据*/@RequestMapping("/setSseEmitter")public String setSseEmitter() {try {sseEmitter.send(System.currentTimeMillis());} catch (IOException e) {logger.error("IOException!", e);return "error";}return "Succeed!";}/*** 将SseEmitter对象设置成完成*/@RequestMapping("/completeSseEmitter")public String completeSseEmitter() {sseEmitter.complete();return "Succeed!";}
}

第一步访问:http://localhost/test/testSseEmitter

第二步连续访问:http://localhost/test/setSseEmitter,第一步会不停返回第二步设置的值

第三步访问:http://localhost/test/completeSseEmitter

可以看到结果,只有当第三步执行后,第一步的访问才算结束

注意

(1)如果第一步请求时间已经到了,第二步没有设值,那么第一步所在请求会抛出异常

(2)如果第一步请求时间已经到了,第二步还在继续设值,那么第二步所在请求会抛出 ResponseBodyEmitter is already set complete 异常

(3)如果第三步执行,表示 sseEmitter 已被关闭,第二步执行就会抛出 ResponseBodyEmitter is already set complete 异常

StreamingResponseBody
用于直接将结果写出到 Response 的 OutputStream 中; 如文件下载等(可尝试文件下载进度实现),优点是不用考虑什么时候关闭流(实际上也根本无法确认什么时候操作完成)示例

@GetMapping("/download")
public StreamingResponseBody handle() {return new StreamingResponseBody() {@Overridepublic void writeTo(OutputStream outputStream) throws IOException {// write...}}
}

异步处理拦截器
在进行异步处理时,可以使用 CallableProcessingInterceptor 来对 Callback返回参数的情况进行拦截,也可以使用 DeferredResultProcessingInterceptor 来对 DeferredResult 的情况进行拦截。 也可以直接使用 AsyncHandlerInterceptor

拦截器的使用与普通拦截器一样,因此此处不再展开;具体可以参考:Spring Boot 拦截器示例及源码原理分析
注意:在使用异步机制前,要在 Web 配置,向 Servlet 添加对异步方法的支持

// 相当于配置 web.xml 
public class WebInitializer implements WebApplicationInitializer {public void onStartup(ServletContext servletContext) throws ServletException {AnnotationConfigWebApplicationContext ctx = new AnnotationConfigWebApplicationContext();ctx.register(SpringConfig.class);ctx.setServletContext(servletContext);Dynamic servlet = servletContext.addServlet("dispatcher", new DispatcherServlet(ctx));servlet.addMapping("/");servlet.setLoadOnStartup(1);//开启异步方法支持servlet.setAsyncSupported(true);}
}