Run a list of futures:
List<String> taskNames = ... taskNames.stream() .map(task->CompletableFuture .supplyAsync(() -> { System.out.println(task); return true; })).map(CompletableFuture::join).collect(Collectors.toList());
Future
- get: a blocking method that waits until the completable future is finished – this is not a java 8 feature!
CompletableFuture
- supplyAsync: runs a supplier asynchronously.
- runAsync:runs a Runnable asynchronously.
- thenApply(map): process the execution result of the supplyAsync and return it
- thenAccept: process the execution result and process the result in place without returning anything.
- allOf: run multiple futures. Doesn’t return the result of all the completableFutures. Need to handle each result alone.
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() –> "Hello"); | |
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() –> "Beautiful"); | |
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() –> "World"); | |
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); | |
combinedFuture.get(); | |
assertTrue(future1.isDone()); | |
assertTrue(future2.isDone()); | |
assertTrue(future3.isDone()); | |
// the result of each one can be processed as follows: | |
String combined = Stream.of(future1, future2, future3) | |
.map(CompletableFuture::join) | |
.collect(Collectors.joining(" ")); |
public class Main { | |
public class RunnableJob implements Runnable{ | |
private final String str; | |
private final Consumer<JobResult> consumer; | |
public RunnableJob(Consumer<JobResult> consumer, String str){ | |
this.str = str; | |
this.consumer = consumer; | |
} | |
@Override | |
public void run() { | |
int sec = (int) (Math.abs(Math.random())*6 + 1); | |
System.out.println("Running "+ str + " – takes " + sec + " seconds"); | |
try { | |
Thread.sleep(sec*1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
consumer.accept(new JobResult(str)); | |
} | |
} | |
public void runJobs(){ | |
Consumer<JobResult> consumer = (input)–>{ | |
System.out.println("Consuming " + input.toString()); | |
}; | |
List<CompletableFuture<Void>> futures = new ArrayList<>(); | |
for(int i=0;i<10;i++) { | |
futures.add(CompletableFuture | |
.runAsync(new RunnableJob(consumer, "Job " + i)) | |
.thenRun(new RunnableJob(consumer, "job " + i + " finished"))); | |
} | |
futures.stream() | |
.map(CompletableFuture::join) | |
.collect(Collectors.<Void>toList()); | |
System.out.println("REACHED END OF runJobs"); | |
} | |
public static void main(String[] args){ | |
new Main().runJobs(); | |
} | |
} | |
/**** | |
Output is: | |
————————————— | |
Running Job 0 – takes 1 seconds | |
Running Job 1 – takes 2 seconds | |
Running Job 3 – takes 2 seconds | |
Running Job 2 – takes 1 seconds | |
Running Job 4 – takes 6 seconds | |
Running Job 5 – takes 5 seconds | |
Running Job 6 – takes 5 seconds | |
Consuming JobResult:Job 0 | |
Consuming JobResult:Job 2 | |
Running job 2 finished – takes 2 seconds | |
Running job 0 finished – takes 4 seconds | |
Consuming JobResult:Job 3 | |
Consuming JobResult:Job 1 | |
Running job 1 finished – takes 5 seconds | |
Running job 3 finished – takes 4 seconds | |
Consuming JobResult:job 2 finished | |
Running Job 7 – takes 3 seconds | |
Consuming JobResult:Job 6 | |
Consuming JobResult:job 0 finished | |
Consuming JobResult:Job 5 | |
Running Job 8 – takes 6 seconds | |
Running job 6 finished – takes 3 seconds | |
Running job 5 finished – takes 6 seconds | |
Consuming JobResult:Job 4 | |
Running job 4 finished – takes 5 seconds | |
Consuming JobResult:job 3 finished | |
Consuming JobResult:Job 7 | |
Running job 7 finished – takes 1 seconds | |
Running Job 9 – takes 2 seconds | |
Consuming JobResult:job 1 finished | |
Consuming JobResult:job 7 finished | |
Consuming JobResult:job 6 finished | |
Consuming JobResult:Job 9 | |
Running job 9 finished – takes 5 seconds | |
Consuming JobResult:job 4 finished | |
Consuming JobResult:job 5 finished | |
Consuming JobResult:Job 8 | |
Running job 8 finished – takes 1 seconds | |
Consuming JobResult:job 8 finished | |
Consuming JobResult:job 9 finished | |
REACHED END OF runJobs | |
***/ |
- thenRun: runs a runnable after the supplyAsync or runAsync.
public class RunnableJob implements Runnable{ | |
@Override | |
public void run() { | |
} | |
} | |
public void runJobs(){ | |
CompletableFuture<Void> f = CompletableFuture.runAsync(new RunnableJob()); | |
f.thenRun(new RunnableJob()); | |
CompletableFuture<RunnableJob> f2 = CompletableFuture.supplyAsync(()–>new RunnableJob()); | |
f2.thenRun(new RunnableJob()); | |
} |
- Async Threads: Runnable and Supply, we can use them to return completable future.
- Monadic design pattern: Combining completable futures in chain using thenCompose and the thenApply which receives the result of the previous completable future.
- thenCompose(flatMap): receives a function that returns another object of the same type. The input of the lambda is the completableFuture result just like the thenApply.
- thenCombine: used when we want to use the result of two futures.
- thenAcceptBoth: doesn’t send result down the future
Functional Interfaces in Java 8
Predicate
Represents a predicate (boolean-valued function) of one argument.
Predicate isAnAdult = age -> age >= 18;
we can actually try using our new predicate in a stream.
Predicate<Person> isAnAdult = person –> person.getAge() >= 18; | |
List<Person> people = getAllPeople(); | |
Integer nrOfAdults = people.stream() .filter(isAnAdult).count(); |
Consumer
Represents an operation that accepts a single input argument and returns no result.
Consumer ticketPrinter = ticket -> ticket.print();
The new forEach method in the Iterable interface takes the Consumer interface as an argument:
Collection tickets = getTicketsToPrint(); tickets.forEach(ticket -> ticket.print());
Supplier
This is kind of a factory. It takes no arguments, and just gives you a result. Perfect for returning an instance.
Supplier ticketHandlerCreator = () -> new TicketHandler();
Another solution is to use the constructor reference.
Supplier ticketHandlerCreator = TicketHandler::new;
Function<T,R>
Represents a function that accepts one argument and produces a result. Let’s just go straight to an example.
Function<String, Predicate<Ticket>> ticketFor = event –> ticket –> event.equals(ticket.getName()); | |
List<Ticket> tickets = getAllTickets(); | |
Integer soldTicketsForCoolEvent = tickets.stream() | |
.filter(ticketFor.apply("CoolEvent")).count(); | |