异步编程的妙用

zxbandzby
9
2025-08-25

1. 体会异步并发的奥妙

以小红书笔详页面渲染为例,抽象一个简单的业务场景:

public NoteDetail getNodeDetail(String noteId) {
    Futrue<UserInfo> userFuture = threadPool.submit(() -> queryUserInfo(noteId));
    Futrue<NoteInfo> noteFuture = threadPool.submit(() -> queryNoteInfo(noteId));
    Futrue<CommentInfo> commentFuture = threadPool.submit(() -> queryCommentInfo(noteId));
    
    NodeDetail noteDeital = NoteDetail.build();
    try {
        UserInfo userInfo = userFuture.get(1L, TimeUnit.SECONDS);
        NoteInfo noteInfo = noteFuture.get(1L, TimeUnit.SECONDS);
        CommentInfo commentInfo = commentFuture.get(1L, TimeUnit.SECONDS);
        noteDetail.userInfo(userInfo).noteInfo(noteInfo).commentInfo(commentInfo).build();
    } catch (Exception e) {
        log.error("get note detail error, noteId:{}", noteId, e)
    }
    return noteDetail;
}

思考:
1. 这种写法还有优化的空间吗?或者它还有哪些不足?
2. 你用过CompletableFuture吗?日常开发中是怎么用的?

public NoteDetail getNodeDetail(String noteId) {
    CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> queryUserInfo(noteId), threadPool);
    CompletableFuture<NoteInfo> noteFuture = CompletableFuture.supplyAsync(() -> queryNoteInfo(noteId), threadPool);
    CompletableFuture<CommentInfo> commentFuture = CompletableFuture.supplyAsync(() -> queryCommentInfo(noteId), threadPool);
 
    return CompletableFuture.allOf(userFuture, noteFuture, commentFuture)
            .thenApply(v -> buildNoteDetail(userFuture.join, noteFuture.join, commentFuture.join))
            .exceptionally(ex -> {
                log.error("get note detail error, noteId:{}", noteId, ex);
                return NoteDetail.build().build();
            })
            .completeOnTimeout(NoteDetail.build().build(), 1, TimeUnit.SECONDS)
            .join();
}
 
private NoteDetail buildNotDetail(UserInfo user, NoteInfo note, CommentInfo comment) {
    return NoteDetail.build()
            .userInfo(user)
            .noteInfo(note)
            .commentInfo(comment)
            .build()
}

2. 为什么是CompletableFuture

2.1 Future的局限

a. 获取结果的方式不够友好

无法主动通知/监听结果,只能通过 get() 或 isDone() 主动查询,不能自动回调通知。get()是阻塞调用,在子线程获取结果之前,主线程会一直等在那里,这与Future的初衷——异步编程思想相违背;isDone()是通过不停轮询的方式获取结果,这会使CPU空转,浪费大量资源。

Future<String> future = executor.submit(() -> "Result");
// 同步阻塞等待
String result = future.get(); 
System.out.println(result);
 
// 非阻塞回调
CompletableFuture.supplyAsync(() -> "Result").thenAccept(result -> System.out.println("Callback: " + result)); 

b. 不支持链式调用和任务组合

对于一些复杂的任务关系不支持链式调用和任务组合,全靠手动实现,代码冗余,不利于维护和理解,比如:

1)A -> B -> C的场景

// 任务1:获取用户ID
Future<Integer> userIdFuture = executor.submit(() -> 1001);
// 阻塞等待第一个任务完成
Integer userId = userIdFuture.get(); 
// 任务2:根据ID查询用户名
Future<String> userNameFuture = executor.submit(() -> "User-" + userId);
// 再次阻塞
String userName = userNameFuture.get(); 
System.out.println(userName); // 输出:User-1001
 
// 无需阻塞等待,自动串联任务
CompletableFuture.supplyAsync(() -> 1001).thenApplyAsync(userId -> "User-" + userId).thenAcceptAsync(System.out::println);

2)A + B -> C的场景

Future<Integer> future1 = executor.submit(() -> 10);
Future<Integer> future2 = executor.submit(() -> 20);
// 阻塞等待所有任务完成
int sum = future1.get() + future2.get(); 
System.out.println("Sum: " + sum); // 输出:Sum: 30
 
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> 20);
cf1.thenCombine(cf2, Integer::sum).thenAccept(sum -> System.out.println("Sum: " + sum));

c. 异常处理不够优雅

Future需要在get()的时候主动获取异常并处理,无法在链中传递,这在流式编程风格下显得格格不入,也会导致有些写法特别冗余

Future<String> future = executor.submit(() -> {
    if (true) {
        throw new RuntimeException("Oops!");
    }    
    return "Success";
});
try {
    future.get(); // 异常在此处暴露
} catch (ExecutionException e) {
    System.out.println("Error: " + e.getCause()); // 需解包真实异常
}
 
CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("CompletableFuture Error!");
    }
    return "Success";
}).handle((result, ex) -> {
    if (ex != null) {
        return "Fallback Value"; // 提供降级结果
    }
    return result;
}).thenAccept(System.out::println); // 输出:Fallback Value

2.2 CompletableFuture简介

a. 设计思想与原理

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈的形式存储,stack表示栈顶元素。这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。

按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手:

被观察者:即当前正在执行的CompletableFuture

  • 其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。

  • 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象。

观察者:可以认为就是回调逻辑,即当前CompletableFuture执行完要去做的事,可以是多个CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。

  • 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。

  • 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。

  • 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。

b. 实战:响应式编程

背景:部门最近在做服务架构升级,要从响应式升级为分层架构,大概流程如下:

层与层之间是串行的,不同层之间一定会有策略依赖相同的外部数据,如果不做“去重”处理,会导致调下游服务QPS增加,而且可能是翻倍的增加,所以我们需要这样一种能力:在同一条请求中,多层中的相同的外部数据能够相互复用,只调一次下游就行

思考:如果是你,会怎么做?

ConcurrentHashMap<String, CompletableFuture<String>> cacheMap = new ConcurrentHashMap();
public void asyncQueryRpcFactors() {
        List<String> factors = Arrays.asList("factor1", "factor2", "factor1", "factor3");
        factors.forEach(factor -> {
            // 复用future,只有第一次会发起实际调用
            CompletableFuture<String> future = cacheMap.computeIfAbsent(
                    factor,
                    k -> CompletableFuture.supplyAsync(() -> invokerAsync(k))
            );
            // 所有回调注册都OK
            future.thenAccept(data -> doBusiness(factor, data));
        });
        // 等待所有异步结束,仅为演示
        cacheMap.values().forEach(f -> {
            try {
                f.get(1, TimeUnit.SECONDS);
            } catch (Exception ignored) {
                
            }
        });
    }
 
    private String invokerAsync(String factorId) {
        System.out.println("invoke=" + factorId);
        // 模拟异步调用返回值
        return "mockValue-" + factorId;
    }
 
    private void doBusiness(String factorId, String value) {
        // 模拟处理业务逻辑
        System.out.println("Processing business for " + factorId + " with value: " + value);
    }

3. 总结回顾

回顾一下,到底什么是CompletableFuture?或者说它有什么特点?适合在什么场景中使用?我总结了关于CompletableFuture的几个关键字:

  • 异步回调:不用傻傻的轮询或者同步阻塞获取异步结果,可以把回调逻辑注册进去,”响应式“执行,实现真正的异步!

  • 链式组合:通过CompletableFuture丰富的API可以实现各种各样的Task组合,且能够跟流式编程,Lambda表达式无缝衔接。

  • 优雅简洁:代码简洁,便于理解,比如优雅处理异常。


附1-CompletableFuture常用API

附2-关于CompletableFuture线程池的注意事项

上面提到的API接口都有可以传自定义线程池方法,这里强烈建议传自己定义的线程池,且根据实际情况做线程池隔离。当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

动物装饰