This post shows how Java 8's CompletableFuture
compares with parallel streams when peforming asynchronous computations.
We will use the following class to model a long-running task:
class MyTask { private final int duration; public MyTask(int duration) { this.duration = duration; } public int calculate() { System.out.println(Thread.currentThread().getName()); try { Thread.sleep(duration * 1000); } catch (final InterruptedException e) { throw new RuntimeException(e); } return duration; } }
Let's create ten tasks, each with a duration of 1 second:
List<MyTask> tasks = IntStream.range(0, 10) .mapToObj(i -> new MyTask(1)) .collect(toList());
How can we calculate the list of tasks efficiently?
Approach 1: Sequentially
Your first thought might be to calculate the tasks sequentially, as follows:
public static void runSequentially(List<MyTask> tasks) { long start = System.nanoTime(); List<Integer> result = tasks.stream() .map(MyTask::calculate) .collect(toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result); }
As you might expect, this takes 10 seconds to run, because each task is run one after the other on the main
thread.
Approach 2: Using a parallel stream
A quick improvement is to convert your code to use a parallel stream, as shown below:
public static void useParallelStream(List<MyTask> tasks) { long start = System.nanoTime(); List<Integer> result = tasks.parallelStream() .map(MyTask::calculate) .collect(toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result); }
The output is
main ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-2 main ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-1 main Processed 10 tasks in 3043 millis
This time it took 3 seconds because 4 tasks were run in parallel (using three threads from the ForkJoinPool
, plus the main
thread).
Approach 3: Using CompletableFutures
Let's see if CompletableFuture
s perform any better:
public static void useCompletableFuture(List<MyTask> tasks) { long start = System.nanoTime(); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate())) .collect(Collectors.toList()); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result); }
In the code above, we first obtain a list of CompletableFuture
s and then invoke the join
method on each future to wait for them to complete one by one. Note that join
is the same as get
, with the only difference being that the former doesn't throw any checked exception, so it's more convenient in a lambda expression.
Also, you must use two separate stream pipelines, as opposed to putting the two map operations after each other, because intermediate stream operations are lazy and you would have ended up processing your tasks sequentially! That's why you first need to collect your CompletableFuture
s in a list to allow them to start before waiting for their completion.
The output is
ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-1 Processed 10 tasks in 4010 millis
It took 4 seconds to process 10 tasks. You will notice that only 3 ForkJoinPool threads were used and that, unlike the parallel stream, the main
thread was not used.
Approach 4: Using CompletableFutures with a custom Executor
One of the advantages of CompletableFuture
s over parallel streams is that they allow you to specify a different Executor
to submit their tasks to. This means that you can choose a more suitable number of threads based on your application. Since my example is not very CPU-intensive, I can choose to increase the number of threads to be greater than Runtime.getRuntime().getAvailableProcessors()
, as shown below:
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) { long start = System.nanoTime(); ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10)); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor)) .collect(Collectors.toList()); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result); executor.shutdown(); }
The output is
pool-1-thread-2 pool-1-thread-4 pool-1-thread-3 pool-1-thread-1 pool-1-thread-5 pool-1-thread-6 pool-1-thread-7 pool-1-thread-8 pool-1-thread-9 pool-1-thread-10 Processed 10 tasks in 1009 millis
After this improvement, it now takes only 1 second to process 10 tasks.
As you can see, CompletableFuture
s provide more control over the size of the thread pool and should be used if your tasks involve I/O. However, if you're doing CPU-intensive operations, there's no point in having more threads than processors, so go for a parallel stream, as it is easier to use.