Java CompletableFuture
Componi calcoli asincroni con CompletableFuture: thenApply, thenCompose, allOf, exceptionally e le insidie da evitare.
Future è un handle per un risultato a singolo utilizzo: invii, aspetti, leggi. Non può concatenare. Se vuoi "esegui A, poi con il risultato di A esegui B, poi combina B con C e passa a D" senza scrivere una macchina a stati a mano, hai bisogno di CompletableFuture — la riprogettazione di Java 8 dell'idea di risultato asincrono attorno alla composizione.
CompletableFuture<V> implementa Future<V>, quindi tutta la vecchia API è ancora presente. La novità è l'API dei combinatori: una trentina di metodi che ti permettono di costruire grafi di flusso dati di lavoro asincrono — applicare funzioni, eseguire effetti collaterali, combinare più future, recuperare da eccezioni, applicare timeout — senza mai bloccare un thread in attesa di un risultato intermedio.
I metodi iniziali
Di solito non si costruisce un CompletableFuture direttamente. Si avvia una pipeline con uno di questi:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String> c = CompletableFuture.completedFuture("ready");
CompletableFuture<String> d = CompletableFuture.failedFuture(new IOException("nope"));| Metodo iniziale | Comportamento |
|---|---|
supplyAsync(Supplier) | Esegue un Supplier sul pool comune, restituisce il valore |
runAsync(Runnable) | Esegue un Runnable sul pool comune, nessun valore |
completedFuture(v) | Un future già risolto con il valore dato |
failedFuture(t) | Un future già fallito con il throwable dato |
supplyAsync e runAsync hanno overload che accettano un Executor esplicito. Quasi sempre si vorrà passarne uno. Il default è ForkJoinPool.commonPool() — un pool condiviso dimensionato in base al numero di CPU, ottimo per brevi lavori CPU ma disastroso se vi si mettono operazioni di I/O (una chiamata lenta blocca un core per tutti). Passa sempre un executor esplicito per I/O o lavori a costo sconosciuto.
Concatenazione: thenApply, thenAccept, thenRun
I combinatori più semplici trasformano un future in un altro:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> b = a.thenApply(n -> "value is " + n); // transform
CompletableFuture<Void> c = a.thenAccept(n -> System.out.println(n)); // consume, no result
CompletableFuture<Void> d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value| Metodo | Tipo lambda | Restituisce |
|---|---|---|
thenApply | Function<T,U> | CompletableFuture<U> |
thenAccept | Consumer<T> | CompletableFuture<Void> |
thenRun | Runnable | CompletableFuture<Void> |
Ogni metodo ha tre varianti:
thenApply(fn)— esegue sul thread che completa lo stage precedentethenApplyAsync(fn)— esegue sul pool comunethenApplyAsync(fn, executor)— esegue su un executor specifico
La forma senza Async è la più veloce (nessun cambio di thread) ma significa che fn viene eseguita sul thread che ha completato lo stage precedente — possibilmente il thread di I/O che non vuoi occupare con lavoro CPU. Le forme *Async sono il default più sicuro nelle pipeline eterogenee.
thenCompose — appiattire un future di un future
thenApply va bene quando la funzione restituisce un valore semplice. Quando restituisce un altro CompletableFuture, non vuoi un CompletableFuture<CompletableFuture<V>> — vuoi thenCompose:
CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
// ^ Function<User, CompletableFuture<Profile>>thenCompose è flatMap per i future. Usalo ogni volta che il passo successivo è a sua volta asincrono; usa thenApply quando non lo è.
Combinare due future: thenCombine
Quando si hanno due valori asincroni indipendenti e si vogliono combinare:
CompletableFuture<Integer> price = fetchPrice(symbol);
CompletableFuture<Integer> shares = fetchShares(account);
CompletableFuture<Integer> total = price.thenCombine(shares, (p, s) -> p * s);thenCombine attende entrambi gli input, poi applica una BiFunction ai loro risultati. I due future vengono eseguiti in parallelo — price e shares sono già in corso quando thenCombine viene registrato. Il combinatore viene eseguito sul thread che completa per secondo.
La versione "any", applyToEither, prende il primo risultato e ignora il secondo.
Molti future: allOf e anyOf
Quando il parallelismo riguarda una collection di future:
List<CompletableFuture<String>> all = ids.stream()
.map(this::fetchAsync)
.toList();
CompletableFuture<Void> doneAll = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));allOf si completa quando ogni input è terminato. Restituisce CompletableFuture<Void> — per ottenere effettivamente la lista dei risultati, devi usare thenApply e raccoglierli:
CompletableFuture<List<String>> results = doneAll.thenApply(v ->
all.stream().map(CompletableFuture::join).toList()); // .join() never blocks here — they're all completeanyOf restituisce il valore del primo input che si completa (come Object — non c'è modo di esprimere "uno qualsiasi di questi future tipizzati" con un unico tipo di ritorno).
Gestione degli errori: exceptionally e handle
Un CompletableFuture può fallire (qualsiasi stage che lancia un'eccezione produce un future fallito a valle). I combinatori che recuperano o trasformano:
CompletableFuture<String> safe = riskyAsync()
.exceptionally(ex -> "fallback for: " + ex.getMessage());
CompletableFuture<String> either = riskyAsync()
.handle((value, ex) -> ex == null ? value : "fallback");| Metodo | Quando viene eseguito | Cosa restituisce |
|---|---|---|
exceptionally(fn) | Solo in caso di fallimento; riceve la causa | Valore recuperato |
handle(bi) | Sempre; riceve (value, ex) (uno è null) | Valore trasformato |
whenComplete(bi) | Sempre; riceve (value, ex) | Stesso future, solo effetto collaterale |
exceptionally è il percorso semplice "cattura e sostituisci". handle è più generale: "esegui sempre, decidi in base all'esito" — utile quando vuoi registrare ogni completamento indipendentemente dal successo.
orTimeout e completeOnTimeout
Java 9 ha aggiunto i timeout direttamente all'API dei future:
CompletableFuture<String> withDeadline = riskyAsync()
.orTimeout(2, TimeUnit.SECONDS); // completes exceptionally if not done in 2s
CompletableFuture<String> withDefault = riskyAsync()
.completeOnTimeout("fallback", 2, TimeUnit.SECONDS);Questi permettono di esprimere scadenze senza scrivere il proprio watchdog. Utilizzano thread schedulati interni, quindi sono economici da collegare.
Non bloccare negli stage asincroni
Il principale errore con CompletableFuture: chiamare .get() o .join() all'interno di uno stage Async. Significa che un thread del pool executor rimane inattivo in attesa di un altro thread dello stesso pool — sotto carico, puoi bloccare l'intero pool.
// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
Integer x = anotherFuture().join(); // blocks a pool thread
return x * 2;
});
// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);Se ti ritrovi a usare .get() all'interno di uno stage Async, volevi usare thenCompose/thenApply.
Usare il proprio executor
Il default del pool comune va bene per brevi lavori CPU. Per I/O o qualsiasi cosa che potrebbe bloccare, usa il tuo:
ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));
CompletableFuture.supplyAsync(this::loadFromDb, io)
.thenApplyAsync(this::transform, cpu)
.thenAcceptAsync(this::sendToClient, io);Ogni passo viene eseguito sul pool giusto. Il pool comune rimane libero per parallelStream e altri utilizzi del framework. Questa combinazione è il cuore di un async Java ben comportato.
Un esempio pratico: una piccola pipeline asincrona
Il programma seguente recupera un "utente" e un "profilo" in parallelo, li combina, applica una scadenza e si recupera da un percorso di errore.
Cosa imparare dall'esecuzione:
- La sezione 1 ha usato
thenCombinesu due recuperi indipendenti. Sono stati eseguiti in parallelo —name(50 ms) eage(80 ms) erano già in corso prima che il combinatore si registrasse. Il future combinato si è completato poco dopo la fine del più lento. Questo è il parallelismo: una pipeline asincrona non aspetta ogni passo in sequenza, compone i passi come un grafo. - La sezione 2 ha usato
thenComposeper concatenare passi in cui ogni passo è esso stesso asincrono.thenApplyavrebbe datoCompletableFuture<CompletableFuture<String>>— inutile.thenComposeappiattisce, come faflatMapper gli stream eOptional. - La sezione 3 ha usato
allOfsu una lista e poithenApplyper raccogliere i valori. L'allOfstesso restituisceVoid; la raccolta dei risultati è un flusso separato sui future (ora completi) usandojoin(). Le chiamatejoin()non bloccano qui perché l'allOfsi è già completato. - La sezione 4 ha mostrato
exceptionallyche si recupera da un task che ha lanciato un'eccezione. Il future upstream è fallito; il future downstream ha restituito la stringa di fallback. Senzaexceptionally(ohandle), il fallimento si sarebbe propagato a.join()comeCompletionException. - La sezione 5 ha usato
orTimeoutper applicare una scadenza di 100 ms a un task da 500 ms. Il future si è completato eccezionalmente conTimeoutException; iljoinl'ha ri-lanciata dentroCompletionException. Questa è la forma giusta per "voglio questo risultato, ma solo se arriva abbastanza velocemente". - La sezione 6 ha usato
handleper ramificare su successo/fallimento in un singolo passo.handleviene sempre eseguito e riceve entrambi(value, ex)— uno è null. Utile quando vuoi una coda uniforme della pipeline indipendentemente dal successo del lavoro.
Cosa c'è dopo
Il prossimo capitolo, Java Fork/Join, tratta il ForkJoinPool — il pool con work-stealing che supporta gli stream paralleli e il pool comune di CompletableFuture, e lo strumento giusto per lavori CPU con approccio divide-et-impera.