> 文章列表 > 请求合并(窗口时间)的几种实现方式RxJava/ExpiringMap/CompletableFuture

请求合并(窗口时间)的几种实现方式RxJava/ExpiringMap/CompletableFuture

请求合并(窗口时间)的几种实现方式RxJava/ExpiringMap/CompletableFuture

RxJava

RxJava是一种响应式编程库,它可以让开发者更加方便地处理异步事件流。RxJava提供了一整套操作符,可以将事件流转换、过滤、合并等等,使得开发者可以更加灵活地处理数据流。

简单实现了从外部传入request对象,subscribe进行订阅消费。
在start()方法中,使用PublishProcessor对象的window()方法,将异步请求流按照时间窗口分割成多个Observable对象。

使用flatMapSingle()方法,将每个时间窗口中的Request对象转换为一个Single对象,并将这些Single对象合并成一个Observable对象。

使用subscribeOn()方法,将Observable对象的处理过程切换到I/O线程中进行。
在subscribe()方法中,使用handleResult()方法对每个时间窗口中的最后一个Request对象进行处理,并输出处理结果。该逻辑可以用于实现一些需要按照时间窗口处理异步请求的场景,例如实现一个简单的日志上传器等。

class RequestManager {private PublishProcessor<Request> observable;public RequestManager() {observable = PublishProcessor.create();}public void addRequest(Request request) {System.out.println("add request: "+ request.toString());observable.onNext(request);}public void start() {observable.window(3, TimeUnit.SECONDS)//3秒为一个窗口.flatMapSingle(window -> window.toList().map(list -> ObjectUtils.isEmpty(list) ? null : list.get(list.size() -1))).subscribeOn(Schedulers.io()).subscribe(result -> handleResult(result), throwable -> {System.out.println(throwable.getMessage());});}private Object handleResult(Object result) {System.out.println("Handling result: " + result);return result;}}

start方法实现2, 先window后group,然后在group直接消费,返回一个Completable给订阅的subscribe,正常subscribe的需要处理消息(请求),这种情况就不需要处理,已经被前置处理过了

public void start() {observable.window(3, TimeUnit.MINUTES).flatMapSingle(window -> window.toList().map(list -> groupRequests(list))).flatMapCompletable(groupedObservable -> groupedObservable.flatMapCompletable(groupedFlowable -> groupedFlowable.toList().subscribeOn(Schedulers.io()).flatMapCompletable(result -> handleResult(result)))).subscribe();}

ExpiringMap

使用ExpiringMap实现的思路: 有消息(请求)进来,加入缓存,并设置过期时间为1分钟,缓存key用来区分用户,同一用户有消息,则覆盖缓存(时间顺延),如果1分钟之内,没有消息进来,则key会到期,则去执行一些对应的操作

代码实现起来实际上很简单,这里不贴代码了

ExpiringMap的过期机制: ExpiringMap会启动一个后台线程来处理过期时间队列,当队列中的时间戳小于等于当前时间时,会将对应的entry从ExpiringMap中删除,并触发key过期的监听器。ExpiringMap还提供了一些其他的功能,例如定时清理过期entry、自定义过期时间计算器、自定义entry过期回调等。这些功能都是基于ConcurrentHashMap和过期时间队列实现的。

CompletableFuture实现的窗口时间

以下是通过AI生成代码,然后也并没有去使用,但是有助于理解整体的实现思路. ( 前提是对CompletableFuture相对比较熟悉的情况下 )

class RequestManagerV1 {@Datastatic class Request{private String url;private int userId;private Date requestTime;public Request(int userId,String url){this.url = url;this.userId = userId;}public Request(){}}private List<Request> requestList;public RequestManagerV1() {requestList = new ArrayList<>();}public void addRequest(Request request) {System.out.println("add request: " + request.toString());requestList.add(request);}public void start() {CompletableFuture.supplyAsync(() -> {// 将请求按照一分钟的时间窗口分组Map<Integer, List<Request>> groupedRequests = groupRequests(requestList, 1, TimeUnit.MINUTES);// 处理每个分组内的请求结果return groupedRequests.entrySet().stream().map(entry -> CompletableFuture.supplyAsync(() -> handleGroupedRequests(entry.getValue()))//CompletableFuture.allOf方法来等待所有异步操作完成,触发完成通知.thenCompose(result -> CompletableFuture.allOf(result.toArray(new CompletableFuture[0])))).toArray(CompletableFuture[]::new);}).thenCompose(result -> CompletableFuture.allOf(result)).join();//join方法来等待这个CompletableFuture` 对象完成,以保证整个方法的执行顺序}/* 将请求按照一分钟的时间窗口分组* @param list* @param windowSize* @param timeUnit* @return*/private Map<Integer, List<Request>> groupRequests(List<Request> list, long windowSize, TimeUnit timeUnit) {Map<Integer, List<Request>> groupedRequests = new HashMap<>();long windowStart = System.currentTimeMillis();for (Request request : list) {long requestTime = request.getRequestTime().getTime();if (requestTime >= windowStart + timeUnit.toMillis(windowSize)) {windowStart = requestTime;}int userId = request.getUserId();groupedRequests.computeIfAbsent(userId, k -> new ArrayList<>()).add(request);}return groupedRequests;}/* 处理每个分组内的请求* @param list* @return*/private List<CompletableFuture<Void>> handleGroupedRequests(List<Request> list) {return list.stream().map(request -> CompletableFuture.runAsync(() -> handleRequest(request))).collect(Collectors.toList());}private void handleRequest(Request request) {System.out.println("Handling request: " + request.toString());// 处理请求的逻辑}
}

k歌软件下载