W3docs

Java CompletableFuture

Componi calcoli asincroni con CompletableFuture: thenApply, thenCompose, allOf, exceptionally e le insidie da evitare.

Future è un handle per un risultato a singolo utilizzo: invii, aspetti, leggi. Non può concatenare. Se vuoi "esegui A, poi con il risultato di A esegui B, poi combina B con C e passa a D" senza scrivere una macchina a stati a mano, hai bisogno di CompletableFuture — la riprogettazione di Java 8 dell'idea di risultato asincrono attorno alla composizione.

CompletableFuture<V> implementa Future<V>, quindi tutta la vecchia API è ancora presente. La novità è l'API dei combinatori: una trentina di metodi che ti permettono di costruire grafi di flusso dati di lavoro asincrono — applicare funzioni, eseguire effetti collaterali, combinare più future, recuperare da eccezioni, applicare timeout — senza mai bloccare un thread in attesa di un risultato intermedio.

I metodi iniziali

Di solito non si costruisce un CompletableFuture direttamente. Si avvia una pipeline con uno di questi:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void>    b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String>  c = CompletableFuture.completedFuture("ready");
CompletableFuture<String>  d = CompletableFuture.failedFuture(new IOException("nope"));
Metodo inizialeComportamento
supplyAsync(Supplier)Esegue un Supplier sul pool comune, restituisce il valore
runAsync(Runnable)Esegue un Runnable sul pool comune, nessun valore
completedFuture(v)Un future già risolto con il valore dato
failedFuture(t)Un future già fallito con il throwable dato

supplyAsync e runAsync hanno overload che accettano un Executor esplicito. Quasi sempre si vorrà passarne uno. Il default è ForkJoinPool.commonPool() — un pool condiviso dimensionato in base al numero di CPU, ottimo per brevi lavori CPU ma disastroso se vi si mettono operazioni di I/O (una chiamata lenta blocca un core per tutti). Passa sempre un executor esplicito per I/O o lavori a costo sconosciuto.

Concatenazione: thenApply, thenAccept, thenRun

I combinatori più semplici trasformano un future in un altro:

CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);

CompletableFuture<String>  b = a.thenApply(n -> "value is " + n);          // transform
CompletableFuture<Void>    c = a.thenAccept(n -> System.out.println(n));    // consume, no result
CompletableFuture<Void>    d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value
MetodoTipo lambdaRestituisce
thenApplyFunction<T,U>CompletableFuture<U>
thenAcceptConsumer<T>CompletableFuture<Void>
thenRunRunnableCompletableFuture<Void>

Ogni metodo ha tre varianti:

  • thenApply(fn) — esegue sul thread che completa lo stage precedente
  • thenApplyAsync(fn) — esegue sul pool comune
  • thenApplyAsync(fn, executor) — esegue su un executor specifico

La forma senza Async è la più veloce (nessun cambio di thread) ma significa che fn viene eseguita sul thread che ha completato lo stage precedente — possibilmente il thread di I/O che non vuoi occupare con lavoro CPU. Le forme *Async sono il default più sicuro nelle pipeline eterogenee.

thenCompose — appiattire un future di un future

thenApply va bene quando la funzione restituisce un valore semplice. Quando restituisce un altro CompletableFuture, non vuoi un CompletableFuture<CompletableFuture<V>> — vuoi thenCompose:

CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
//                                          ^ Function<User, CompletableFuture<Profile>>

thenCompose è flatMap per i future. Usalo ogni volta che il passo successivo è a sua volta asincrono; usa thenApply quando non lo è.

Combinare due future: thenCombine

Quando si hanno due valori asincroni indipendenti e si vogliono combinare:

CompletableFuture<Integer> price   = fetchPrice(symbol);
CompletableFuture<Integer> shares  = fetchShares(account);
CompletableFuture<Integer> total   = price.thenCombine(shares, (p, s) -> p * s);

thenCombine attende entrambi gli input, poi applica una BiFunction ai loro risultati. I due future vengono eseguiti in parallelo — price e shares sono già in corso quando thenCombine viene registrato. Il combinatore viene eseguito sul thread che completa per secondo.

La versione "any", applyToEither, prende il primo risultato e ignora il secondo.

Molti future: allOf e anyOf

Quando il parallelismo riguarda una collection di future:

List<CompletableFuture<String>> all = ids.stream()
    .map(this::fetchAsync)
    .toList();

CompletableFuture<Void> doneAll  = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));

allOf si completa quando ogni input è terminato. Restituisce CompletableFuture<Void> — per ottenere effettivamente la lista dei risultati, devi usare thenApply e raccoglierli:

CompletableFuture<List<String>> results = doneAll.thenApply(v ->
    all.stream().map(CompletableFuture::join).toList());        // .join() never blocks here — they're all complete

anyOf restituisce il valore del primo input che si completa (come Object — non c'è modo di esprimere "uno qualsiasi di questi future tipizzati" con un unico tipo di ritorno).

Gestione degli errori: exceptionally e handle

Un CompletableFuture può fallire (qualsiasi stage che lancia un'eccezione produce un future fallito a valle). I combinatori che recuperano o trasformano:

CompletableFuture<String> safe = riskyAsync()
    .exceptionally(ex -> "fallback for: " + ex.getMessage());

CompletableFuture<String> either = riskyAsync()
    .handle((value, ex) -> ex == null ? value : "fallback");
MetodoQuando viene eseguitoCosa restituisce
exceptionally(fn)Solo in caso di fallimento; riceve la causaValore recuperato
handle(bi)Sempre; riceve (value, ex) (uno è null)Valore trasformato
whenComplete(bi)Sempre; riceve (value, ex)Stesso future, solo effetto collaterale

exceptionally è il percorso semplice "cattura e sostituisci". handle è più generale: "esegui sempre, decidi in base all'esito" — utile quando vuoi registrare ogni completamento indipendentemente dal successo.

orTimeout e completeOnTimeout

Java 9 ha aggiunto i timeout direttamente all'API dei future:

CompletableFuture<String> withDeadline = riskyAsync()
    .orTimeout(2, TimeUnit.SECONDS);                  // completes exceptionally if not done in 2s

CompletableFuture<String> withDefault = riskyAsync()
    .completeOnTimeout("fallback", 2, TimeUnit.SECONDS);

Questi permettono di esprimere scadenze senza scrivere il proprio watchdog. Utilizzano thread schedulati interni, quindi sono economici da collegare.

Non bloccare negli stage asincroni

Il principale errore con CompletableFuture: chiamare .get() o .join() all'interno di uno stage Async. Significa che un thread del pool executor rimane inattivo in attesa di un altro thread dello stesso pool — sotto carico, puoi bloccare l'intero pool.

// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
  Integer x = anotherFuture().join();                 // blocks a pool thread
  return x * 2;
});

// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);

Se ti ritrovi a usare .get() all'interno di uno stage Async, volevi usare thenCompose/thenApply.

Usare il proprio executor

Il default del pool comune va bene per brevi lavori CPU. Per I/O o qualsiasi cosa che potrebbe bloccare, usa il tuo:

ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));

CompletableFuture.supplyAsync(this::loadFromDb, io)
    .thenApplyAsync(this::transform, cpu)
    .thenAcceptAsync(this::sendToClient, io);

Ogni passo viene eseguito sul pool giusto. Il pool comune rimane libero per parallelStream e altri utilizzi del framework. Questa combinazione è il cuore di un async Java ben comportato.

Un esempio pratico: una piccola pipeline asincrona

Il programma seguente recupera un "utente" e un "profilo" in parallelo, li combina, applica una scadenza e si recupera da un percorso di errore.

java— editable, runs on the server

Cosa imparare dall'esecuzione:

  • La sezione 1 ha usato thenCombine su due recuperi indipendenti. Sono stati eseguiti in paralleloname (50 ms) e age (80 ms) erano già in corso prima che il combinatore si registrasse. Il future combinato si è completato poco dopo la fine del più lento. Questo è il parallelismo: una pipeline asincrona non aspetta ogni passo in sequenza, compone i passi come un grafo.
  • La sezione 2 ha usato thenCompose per concatenare passi in cui ogni passo è esso stesso asincrono. thenApply avrebbe dato CompletableFuture<CompletableFuture<String>> — inutile. thenCompose appiattisce, come fa flatMap per gli stream e Optional.
  • La sezione 3 ha usato allOf su una lista e poi thenApply per raccogliere i valori. L'allOf stesso restituisce Void; la raccolta dei risultati è un flusso separato sui future (ora completi) usando join(). Le chiamate join() non bloccano qui perché l'allOf si è già completato.
  • La sezione 4 ha mostrato exceptionally che si recupera da un task che ha lanciato un'eccezione. Il future upstream è fallito; il future downstream ha restituito la stringa di fallback. Senza exceptionally (o handle), il fallimento si sarebbe propagato a .join() come CompletionException.
  • La sezione 5 ha usato orTimeout per applicare una scadenza di 100 ms a un task da 500 ms. Il future si è completato eccezionalmente con TimeoutException; il join l'ha ri-lanciata dentro CompletionException. Questa è la forma giusta per "voglio questo risultato, ma solo se arriva abbastanza velocemente".
  • La sezione 6 ha usato handle per ramificare su successo/fallimento in un singolo passo. handle viene sempre eseguito e riceve entrambi (value, ex) — uno è null. Utile quando vuoi una coda uniforme della pipeline indipendentemente dal successo del lavoro.

Cosa c'è dopo

Il prossimo capitolo, Java Fork/Join, tratta il ForkJoinPool — il pool con work-stealing che supporta gli stream paralleli e il pool comune di CompletableFuture, e lo strumento giusto per lavori CPU con approccio divide-et-impera.

Esercitazione

Pratica
Scrivi `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. All'interno della lambda chiami `.get()` su un altro future inviato allo stesso pool predefinito. Qual è il rischio?
Scrivi `CompletableFuture.supplyAsync(() -> { Integer x = otherFuture().get(); return x * 2; })`. All'interno della lambda chiami `.get()` su un altro future inviato allo stesso pool predefinito. Qual è il rischio?
Was this page helpful?