并发编程 | JDK 1.8 中的 CompletableFuture | FRP风格

在异步编程中,Future/Promise模式是一种广泛使用的异步开发模式,其中 Future 对象代表一个尚未完成异步操作的结果。从JDK 1.5以来,JUC包一直提供着最基本的Future,不过它太鸡肋了,除了getcancelisDoneisCancelled方法之外就没有其他的操作了,这样很不方便。好在JDK 1.8中引入了具有FRP风格的 CompletableFuture,它类似于Scala中的 FutureCompletableFuture 属于Monad, 因此支持一系列的函数式的组合、运算操作,非常方便,可以写出很FRP风格的代码而摆脱callback hell。

下面我们来结合FRP的思想,总结一下这些操作(有的时候为了方便表示,我会用Haskell或Scala的语法来表示类型,毕竟Java的类型系统太渣):

构造CompletableFuture对象

CompletableFuture类通过工厂模式创建CompletableFuture对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}

如果我们的异步操作不需要返回值,那么可以通过runAsync方法提供一个Runnable创建一个CompletableFuture<Void>对象。如果我们的异步操作需要返回值,那么可以通过supplyAsync方法提供一个Supplier<U>对象来创建:

1
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> longTask(param));

如果不提供Executor的话,默认使用ForkJoinPool.commonPool()作为线程池。

后缀为Async的方法代表异步执行。

变换(fmap)

假如我们要通过CompletableFuture来异步获取一组数据,并对数据进行一些处理(变换),我们可以使用thenApplythenApplyAsync方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}

它其实就是fmap函数,用Haskell表示原型为:

1
2
3
4
-- 对比一下fmap
fmap :: Functor f => (a -> b) -> f a -> f b
thenApply :: (a -> b) -> CompletableFuture a -> CompletableFuture b

它们不仅可以变换数据的值,也可以变换数据的类型,如:

1
2
3
CompletableFuture<Double> f = CompletableFuture.supplyAsync(() -> "4")
.thenApply(Integer::parseInt)
.thenApply(r -> r * r * Math.PI);

fmap以后,数据流的类型进行了以下变换:String -> Integer -> Double

组合(bind)

有的时候,我们需要在异步操作完成的时候对异步操作的结果进行一些操作,并且操作仍然返回CompletableFuture类型。我们可以利用thenCompose方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}

可以看出它其实对应了Monad里的bind操作(Java和Scala中为flatMap),用Haskell表示原型为:

1
2
3
(>>=) :: Monad m => m a -> (a -> m b) -> m b
thenCompose :: CompletableFuture a -> (a -> CompletableFuture b) -> CompletableFuture b

thenCompose是一个非常重要的操作,它对于构建异步的pipeline非常有用。举个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TaskWorkI {
public static Optional<List<Integer>> longTask(Integer i) {
if (i > 0) {
List<Integer> list = new ArrayList<>();
for(int pc = 0; pc < i; pc++)
list.add(pc);
return Optional.of(list);
}
else
return Optional.empty();
}
public static CompletableFuture<Long> getResultFuture(Optional<List<Integer>> op) {
return CompletableFuture.supplyAsync(() -> {
if (op.isPresent())
return op.get().stream()
.map(Integer::toUnsignedLong)
.reduce(0L, (x, y) -> x + y);
else
return -1L;
});
}
public static void main(String[] args) throws Exception {
CompletableFuture<Long> f = CompletableFuture.supplyAsync(() -> longTask(1000000))
.thenComposeAsync(TaskWorkI::getResultFuture);
Long result = f.get();
System.out.println(result);
}
}

超级变换(liftM2)

CompletableFuture类里面还有个thenCombine操作,它的原型看起来非常晕:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(null, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn) {
return biApplyStage(asyncPool, other, fn);
}
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}

Java类型系统过于坑爹,我们用Haskell表示其原型就一目了然了:

1
thenCombine :: CompletableFuture a -> CompletableFuture b -> (a -> b -> c) -> CompletableFuture c

把参数调调位置,可以发现thenCombine其实类似于Haskell中的liftM2操作:

1
2
3
liftM2 :: Monad m => (a1 -> a2 -> r) -> m a1 -> m a2 -> m r
thenCombine :: CompletableFuture m => (a -> b -> c) -> m a -> m b -> m c

简单示例

下面我们用一个简单的例子来说明CompletableFuture的使用。假设我们需要获取一篇文章(Article)的信息、对应分类(Category)信息以及对应的评论数,而且从数据库中query的操作是异步的(每个DB操作都返回一个CompletableFuture),我们可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Article {}
class Category {}
class ArticleWithCategory {
private Article article;
private Category category;
public ArticleWithCategory(Article article, Category category) {
this.article = article;
this.category = category;
}
}
class AwcWithCount {
private ArticleWithCategory awc;
private int count;
public AwcWithCount(ArticleWithCategory awc, int count) {
this.awc = awc;
this.count = count;
}
}
public CompletableFuture<ArticleWithCategory> fetchAWC(int aid) {
// 从数据库中异步获取文章信息与分类信息
}
public CompletableFuture<Integer> getCount(int aid) {
// 从数据库中异步获取评论数
}
public CompletableFuture<AwcWithCount> fetchWithAWCC(int aid) {
return fetchAWC(aid).thenCompose(x -> // flatMap
getCount(aid).thenApply(y -> // map
new AwcWithCount(x, y)
));
}

这其实和Scala中的Slick的各种组合特别相似:

1
2
3
4
5
6
7
8
9
10
11
12
13
def fetchWithAWCC(aid: Int): Future[Option[(Article, Category, Int)]] = {
db.run((for {
a <- articles if _.aid === aid
c <- categories if _.cid === a.cid
} yield(a, c)).result.headOption) flatMap {
case Some(a, c) =>
db.run(comments.filter(_.aid === aid).length) map { case res =>
Some(a, c, res)
}
case None =>
Future(None)
}
}

Rx中同样也提供了类似的组合操作,而且更为丰富。

文章目录
  1. 1. 构造CompletableFuture对象
  2. 2. 变换(fmap)
  3. 3. 组合(bind)
  4. 4. 超级变换(liftM2)
  5. 5. 简单示例