CompletableFuture异步开发类全面解析

CompletableFutureJava 8引入的,一个很好用的特性,但被LambdaStream等的光环给盖住了。

总的来说,CompletableFuture把Java 5引入的Future的锅都补上了,具体直接看例子吧。


例子说明

本文所述的例子纯属虚构,如有雷同,纯属巧合(可联系作者删除),版权归作者所有。
例子主要服务于问题/原理讲解,以通熟易懂为主。可能与现实或常识脱钩,请别太较真。

这是一个虚构的订单分拣的例子:

  • 一个订单只包括一个商品,数量不限(>=1)
  • 4个仓库中心(010,020,021,023),包含每个商品在具体哪个货架多少数量等明细
  • 仓库不总是可用的(譬如错峰放假、运送区域不支持等)
  • 有一个全局的库存汇总(无明细,可快速查询全国某商品的总数)
  • 为了提高分库扣减库存的有效性
  • 先查询总库,只有总库存数量足够,才开始进行分库库存分拣
  • 在分拣的同时,订单可能被撤销。一旦撤销,本次分拣结束
  • 为使代码简单
  • 库存分拣只是查询分库库存
  • 库存回撤没有任何实现
  • 实现是有状态的,所以不是线程安全的

处理流程

彩色文字是后面对应的代码调用。看代码的时候可以结合流程图加深理解。

状态变化

大概的接口定义(延迟模拟远程调用):

  • int StockService.query(String product): 查总库库存 (快速
  • 随机延迟100~200ms
  • int StockService.pick(String repo, String product): 查/锁定分库库存(含货架)
  • 随机延迟500~2500ms
  • Availability RepoService.isAvailable(String repo): 查分库是否可用 (快速
  • 随机延迟100ms
  • void EventService.listenOrderCancel(String order): 监听订单取消事件
  • 延迟2000ms
  • boolean PackageService.pack(String oid, String pid): 分拣订单
  • 分成2个子类SingleRepoPackageService和MultiRepoPackage类分别演示单仓库和多仓库的实现
  • 共用代码存放入AbstractRepoPackageService

  • 对象模型

例子实战 - 单仓库

文末有完整代码

流程总体控制

几个阀门(CompletableFuture,下简称CF)来控制什么时候终止流程:

/* 监听订单取消事件 */
CompletableFuture<Boolean> cancelListener =
        runAsync(() -> eventService.listenOrderCancel(oid))
                .thenApply(reason -> false);

/* 分拣结束标记 */
CompletableFuture<Boolean> allocated = new CompletableFuture<>();

/* 总开关(权衡订单取消和分拣结束) */
CompletableFuture<Boolean> terminator =
        allocated.applyToEitherAsync(cancelListener, Function.identity());

监听订单取消事件

使用CF.runAsync创建CF实例,并在收到信号后回调(thenApply)声明分拣失败

分拣结束标记

直接创建一个新实例,其结束将由后面的分拣流程控制(直接调用其complete(boolean)

总开关

使用CF的二选一(applyToEitherAsync)来创建一个新的总控CF:即任何一个CF执行完毕,这个总控就结束。

分拣

supplyAsync(() -> stockService.query(pid))
    .whenComplete((stock, e) -> {
        if (e != null) {
            e.printStackTrace();
            allocated.complete(false);
        } else if (stock < q) {
            logger.log("not enough stock (%d < %d)", stock, q);
            allocated.complete(false);
        } else {
            logger.log("total stock is enough (%d >= %d). " + 
                        "allocating from repos...", stock, q);
            startPick(pid);
        }
    });

先检查总库存(快速)

使用CF.supplyAsync(Supplier<Integer>)开启异步总库存查询。

根据库存情况

whenComplete(BiConsumer<Integer, Throwable>)是其执行结果的回调入口,根据库存情况:

  • 不足,标识分拣以失败结束(allocated.complete(false))(级联触发总开关结束)
  • 充足,开始分库分拣(startPick()
  • 异常处理,标记失败结束(allocated.complete(false))(级联触发总开关结束)

分库分拣

supplyAsync(() -> repoService.isAvailable(repo))
    .thenCompose(this::pick)
    .thenAccept(this::allocateStock)
    .thenRun(this::isAllocatedFully)  
    // <-- 到这里返回就是startPickFromOneRepo(),后面多仓库会调用
    .whenComplete(this::completeAllocate);

检查分库的可用性

supplyAsync(Supplier<Availability>)创建CF实例,检查分库的可用性

分拣

thenCompose(Function<Availability, Stock>)回调,根据分库的可用性:

  • 不可用,直接创建一个空的CF<Stock>,标识数量为0
CompletableFuture<Stock> dummy = new CompletableFuture<Stock>();
dummy.complete(new Stock(a.repo, 0));
return dummy;
  • 可用,创建新的查询库存的CF<Stock> (supplyAsync(Supplier<Stock>))
  • return supplyAsync(() -> stockService.pick(a.repo, pid));

分配库存

thenAccept(Consumer<Stock>)回调,执行实际的库存分配

检查是否库存分配达标

thenRun(Runnable)回调,如果达标,allocated.complete(true)标记分拣以成功结束(级联触发总开关结束)

打完收工

whenComplete(BiConsumer<Void, Throwable>)收尾,处理分配标识还没结束的case(前面库存分配达标时,会标识结束)。这种遗漏的情况标记分拣以失败结束(级联触发总开关结束)

返回结果

return terminator.get(5, TimeUnit.SECONDS);

例子实战 - 多仓库

final CompletableFuture[] queries = new CompletableFuture[repos.size()];
final Iterator<String> iter = repos.iterator();
for (int i = 0; i < queries.length; i++) {
    queries[i] = startPickFromOneRepo(iter.next(), pid);
}

allOf(queries).whenComplete(this::completeAllocate);

基本和单仓库类似,主要是循环让单仓库的分拣并行跑。

allOf(CF...):必须所有的CF都完成了,才开始回调whenComplete(BiConsumer<Void, Throwable>)。注意之前的流程图和单仓库处理逻辑,中间:

  • 库存分配满额/达标,会提前标识成功分拣结束(其实其他仓库的分配还在并行进行,实际代码需要注意中断/或防止额外再分配库存)

总结

在整个例子中,

创建/开启CF实例

用到的:

  • CF.supplyAsync(Supplier<T>)
  • CF.runAsync(Runnable)
  • new CF()

遗漏的:

  • handleAsync(Function<T, U>)

关于回调

用到的:

  • thenApply(Function<T, U>)
  • whenComplete(BiConsumier<T, Throwable>)
  • thenCompose(Function<T, U>)
  • thenAccept(Consumer<T>)
  • thenRun(Runnable)

遗漏的:

都在了

关于二合一或二选一

用到的:

  • applyToEitherAsync(CF, Function<T, U>)

遗漏的:

  • acceptEitherAsync(CF, Consumer<T>)
  • runAfterBothAsync(CF, Runnable)
  • runAfterEitherAsync(CF, Runnable)
  • thenAcceptBothAsync(CF, BiConsumer<T, U>)
  • thenCombineAsync(CF, BiFunction<T, U, V)

全部或任一

用到的:

  • allOf(CF...)

遗漏的:

都在了

状态检查

用到的:

  • isDone()

遗漏的:

  • isCancelled()
  • isCompletedExceptionally()

手动标识完成/或异常/取消

用到的:

  • complete(T)

遗漏的:

  • completedFuture(U)
  • completeExceptionally(Throwable)
  • exceptionally(Function<Throwable, T>)
  • obtrudeException(Throwable)
  • obtrudeValue(T)

等待/阻塞

用到的:

  • get

遗漏的:

  • getNow
  • join

CF的接口设计是很有规律的,譬如:

  • 一个方法xxx,其相应的会有xxxAsync以及xxxAsync(.., Executor)的变体
  • 建议使用Executor的那个,可以避免ForkJoinPool.commonPool()堆积问题(有空会单独成篇讲一下)
  • 参数支持了Function<T, U>的,必然有其他类似但接受类型为Consumer<T>Runnable

理解了一种,对接受其他几个变体应该难度不大。

我们这个例子涵盖了每个大类的1~N个方法,对于CF的使用大概就应该是这样了。

调用套路

  • 起手:supplyAsync或其他
  • 序盘:thenXxx或其他的多个串行回调
  • 中盘:二合一或二选一thenCombineAsync或其他
  • 官子:全部allOf + 阻塞get等

注意:不是所有的问题都需要走完整的套路的。

回答开头的问题:

什么高深的道理,一篇好的例子都能讲透;如果不行,那就两篇。如果还不行:

放出所有源代码(guava是唯一的外部依赖)。注意,由于这里有很多随机数,请多跑几遍,应该能跑出所有的可能:

  • 总的库存不足,分配失败
[ 109ms] initialized repos: [023, 020, 010, 021]
[  11ms] pre-checking stock quickly...
[ 135ms] stocks: [total=3, repos={023=0, 020=0, 010=1, 021=2}]
[ 141ms] not enough stock (3 < 5)
[ 142ms] allocated: false
  • 订单无故被取消
[ 110ms] initialized repos: [023, 020, 010, 021]
[  14ms] pre-checking stock quickly...
[ 149ms] stocks: [total=7, repos={023=1, 020=1, 010=2, 021=3}]
[ 158ms] total stock is enough (7 >= 5). allocating from repos...
[ 165ms] repo 021 NOT available
[ 165ms] repo 021 was allocated 0
[ 194ms] repo 020 is available
[ 250ms] repo 010 is available
[ 255ms] repo 023 is available
[1336ms] repo 020 was allocated 1
[2027ms] cancelled with no reason
[2028ms] allocated: false
  • 总库存足,但分库的分配没有满额
[ 126ms] initialized repos: [023, 020, 010, 021]
[  27ms] pre-checking stock quickly...
[ 194ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}]
[ 203ms] total stock is enough (6 >= 5). allocating from repos...
[ 259ms] repo 023 is available
[ 288ms] repo 020 is available
[ 291ms] repo 021 NOT available
[ 292ms] repo 021 was allocated 0
[ 294ms] repo 010 is available
[1355ms] repo 020 was allocated 1
[1664ms] repo 023 was allocated 1
[1930ms] repo 010 was allocated 2
[1930ms] didn't get enough stock.
[1930ms] allocated: false
  • 分配成功
[ 104ms] initialized repos: [023, 020, 010, 021]
[  13ms] pre-checking stock quickly...
[ 206ms] stocks: [total=6, repos={023=1, 020=1, 010=2, 021=2}]
[ 213ms] total stock is enough (6 >= 5). allocating from repos...
[ 237ms] repo 020 is available
[ 238ms] repo 023 is available
[ 241ms] repo 010 is available
[ 284ms] repo 021 is available
[ 937ms] repo 021 was allocated 2
[1052ms] repo 020 was allocated 1
[1278ms] repo 023 was allocated 1
[1718ms] repo 010 was allocated 2
[1719ms] 6 >= 5
[1719ms] allocated: true
代码版权归作者,仅非商业使用时可无需作者授权即可使用(非常感谢使用时标注来源)。

希望这篇博文能对你有所帮助,喜欢的话点个赞吧!


PackageService.java

package cf;
/**
 * @author zhhe.me@gmail.com
 * @since 10/9/2018
 */
public interface PackageService {
    boolean pack(String oid, String pid);
}

AbstractPackageService.java

package cf;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static java.util.concurrent.CompletableFuture.*;
import static cf.Util.*;

/**
 * Not ThreadSafe and just for demo.
 *
 * @author zhhe.me@gmail.com
 * @since 10/9/2018
 */
public abstract class AbstractPackageService implements PackageService {

    protected StockService stockService = new StockService();
    protected RepoService repoService = new RepoService();
    protected EventService eventService = new EventService();

    /* stateful variables. */
    protected AtomicInteger queriedQ = new AtomicInteger(0);
    protected CompletableFuture<Boolean> allocated = new CompletableFuture<>();

    @Override
    public boolean pack(String oid, String pid) {
        /* set global repos since it's used by single repo + multi repo demos. */
        repos.clear();
        repos.addAll(getRepos());
        logger.log("initialized repos: %s", repos);

        /* set started time. */
        logger.start();

        /* a listener to monitor if this order's cancellation event was emitted. */
        final CompletableFuture<Boolean> cancelListener =
                runAsync(() -> eventService.listenOrderCancel(oid))
                        .thenApply(reason -> false);

        /* a control to indicate if package was completed. */
        final CompletableFuture<Boolean> terminator =
                allocated.applyToEitherAsync(cancelListener, Function.identity());

        logger.log("pre-checking stock quickly...");
        supplyAsync(() -> stockService.query(pid))
                .whenComplete((stock, e) -> {
                    if (e != null) {
                        e.printStackTrace();
                        allocated.complete(false);
                    } else if (stock < q) {
                        logger.log("not enough stock (%d < %d)", stock, q);
                        allocated.complete(false);
                    } else {
                        logger.log("total stock is enough (%d >= %d). allocating from repos...", stock, q);
                        startPick(pid);
                    }
                });

        try {
            // wait until package was completed.
            return terminator.get(5, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /** repos used to initialize global repos and generate stocks. */
    protected abstract Collection<String> getRepos();

    /** the entry to kick off pick process. */
    protected abstract void startPick(String pid);

    /** a process to pick up stock from one repo. */
    protected CompletableFuture<Void> startPickFromOneRepo(String repo, String pid) {
        return supplyAsync(() -> repoService.isAvailable(repo))
                .thenCompose(this::pick)
                .thenAccept(this::allocateStock)
                .thenRun(this::isAllocatedFully)
        ;
    }

    /** pick up stock based on repo's availability. */
    protected CompletableFuture<Stock> pick(RepoService.Availability a) {
        if (!a.available) {
            CompletableFuture<Stock> dummy = new CompletableFuture<Stock>();
            dummy.complete(new Stock(a.repo, 0));
            return dummy;
        } else {
            return supplyAsync(() -> stockService.pick(a.repo, pid));
        }
    }

    /** allocate stock. */
    protected void allocateStock(Stock stock) {
        queriedQ.addAndGet(stock.count);
        logger.log("repo %s was allocated %d", stock.repo, stock.count);
    }

    /** check if all stocks are allocated enough. If yes, stop process. */
    protected boolean isAllocatedFully() {
        final int i = queriedQ.get();
        if (i >= q) {
            logger.log("%d >= %d", i, q);
            allocated.complete(true);
        }
        return i >= q;
    }

    /** complete allocation process. */
    protected void completeAllocate(Void v, Throwable e) {
        if (e != null) {
            e.printStackTrace();
        }else if (!allocated.isDone()) {
            allocated.complete(false);
            logger.log("didn't get enough stock.");
        }
    }
}

SingleRepoPackageService.java

package cf;

import com.google.common.collect.ImmutableSet;
import java.util.Collection;

import static cf.Util.*;
/**
 * Not ThreadSafe and just for demo.
 *
 * @author zhhe.me@gmail.com
 * @since 10/9/2018
 */
public class SingleRepoPackageService extends AbstractPackageService {

    private final String repo = "021";

    public static void main(String... args) throws Exception {
        final boolean result = new SingleRepoPackageService().pack(oid, pid);
        logger.log("allocated: %s", result);
    }

    @Override
    protected Collection<String> getRepos() {
        return ImmutableSet.of(repo);
    }

    protected void startPick(String pid) {
        startPickFromOneRepo(repo, pid).whenComplete(this::completeAllocate);
    }
}

MultiRepoPackageService.java

package cf;

import com.google.common.collect.ImmutableSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

import static cf.Util.*;
import static java.util.concurrent.CompletableFuture.allOf;
/**
 * Not ThreadSafe and just for demo.
 *
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
public class MultiRepoPackageService extends AbstractPackageService {

    public static void main(final String... args) throws Exception {
        final boolean result = new MultiRepoPackageService().pack(oid, pid);
        logger.log("allocated: %s", result);
    }

    @Override
    protected Collection<String> getRepos() {
        return ImmutableSet.of("010", "020", "021", "023");
    }

    @Override
    protected void startPick(String pid) {
        final CompletableFuture[] queries = new CompletableFuture[repos.size()];
        final Iterator<String> iter = repos.iterator();
        for (int i = 0; i < queries.length; i++) {
            queries[i] = startPickFromOneRepo(iter.next(), pid);
        }

        allOf(queries).whenCompleteAsync(this::completeAllocate);
    }
}

Util.java

package cf;

import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.time.LocalTime;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
/**
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
public interface Util {

    Logger logger = Logger.getInstance();
    int q = 5;
    Set<String> repos = new HashSet<>();
    String oid = "jianshu";
    String pid = "Samsung S10";
    Random r = new SecureRandom(ByteBuffer.allocate(4).putInt(LocalTime.now().getNano()).array());

    static void delay(int base, int random) {
        try {
            Thread.sleep(base + r.nextInt(random));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Logger.java

package cf;

/**
 * @author zhhe.me@gmail.com.
 * @since 9/9/2018
 */
public class Logger {

    private static final Logger INSTANCE = new Logger();

    private long started;

    private Logger() {
        started = System.nanoTime();
    }

    static Logger getInstance() {
        return INSTANCE;
    }

    Logger start() {
        started = System.nanoTime();
        return this;
    }

    void log(String s, Object... args) {
        if (args==null)
            args = new Object[0];

        final String formatS = "[%4dms] " + s + "%n"; //+ "\t<<<%s>>>%n";
        final int argLength = args.length + 2;


        final Object[] args2 = new Object[argLength];
        args2[0] = (System.nanoTime()-started)/1_000_000;
        System.arraycopy(args, 0, args2, 1, args.length);
        args2[argLength-1] = Thread.currentThread().getName();

        System.out.format(formatS, args2);
    }
}

EventService.java

package cf;

import static cf.Util.*;
/**
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
public class EventService {

    public void listenOrderCancel(String order) {
        delay(2000, 300);
         logger.log("cancelled with no reason");
    }
}

RepoService.java

package cf;

import static cf.Util.*;
/**
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
public class RepoService {

    public Availability isAvailable(String repo) {
        delay(0, 100);
        final Availability availability = new Availability(repo, r.nextInt(5) != 0);
        logger.log("repo %s %s available", repo, availability.available ? "is" : "NOT");
        return availability;
    }

    public static class Availability {
        String repo;
        boolean available;

        public Availability(String repo, boolean available) {
            this.repo = repo;
            this.available = available;
        }
    }
}

Stock.java

package cf;

/**
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
class Stock {
    String repo;
    int count;

    public Stock(String repo, int count) {
        this.repo = repo;
        this.count = count;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Stock{");
        sb.append("repo='").append(repo).append('\'');
        sb.append(", count=").append(count);
        sb.append('}');
        return sb.toString();
    }
}

StockService.java

package cf;

import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import static cf.Util.*;
/**
 * @author zhhe.me@gmail.com
 * @since 9/9/2018
 */
public class StockService {
    private Map<String, Integer> stocks = new HashMap<>();

    public int query(String prd) {
        delay(100, 100);
        int q2 = (q-q/4-1) + r.nextInt(q);
        generateStock(q2);
        return q2;
    }

    public Stock pick(String repo, String prd) {
        final Stock stock = new Stock(repo, stocks.get(repo));
        delay(500, 2000);
        return stock;
    }

    private void generateStock(int q) {
        final Iterator<String> iter = repos.iterator();
        if (repos.size() == 1) {
            stocks = ImmutableMap.of(iter.next(), q);
        } else {
            stocks = ImmutableMap.of(
                    iter.next(), q / 5,
                    iter.next(), q / 4,
                    iter.next(), q / 3,
                    iter.next(), (q - q / 5 - q / 4 - q / 3)
            );
        }
        logger.log("stocks: [total=%d, repos=%s]", q, stocks);
    }
}


评论区
Rick ©2018