W3docs

Java Parallel Streams

Elabora stream Java in parallelo per maggiori prestazioni: quando parallelStream aiuta e quando peggiora le cose.

Uno stream parallelo è la stessa pipeline di stream che hai già scritto, ma la JVM può suddividere la sorgente in blocchi ed elaborarli su più thread. La modifica nel codice è minima:

long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
//              ^^^^^^^^^^^^^^^^^

oppure:

long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();

La forma della pipeline, le operazioni, il risultato — tutto invariato. Ciò che cambia è chi la esegue: invece di un singolo thread che scorre la sorgente, diversi worker del ForkJoinPool comune (uno per core CPU, meno uno) si dividono il lavoro e un coordinatore unisce i risultati parziali. Quando il lavoro per elemento è abbastanza pesante e la sorgente si divide agevolmente, la pipeline termina in circa tempo reale / core. Quando non è così, il parallelo è più lento del sequenziale — e a volte scorretto. Questo capitolo spiega come distinguere i due casi.

Cosa fa realmente "parallel"

Uno stream sequenziale elabora un elemento alla volta attraverso la pipeline, poi il successivo. Uno stream parallelo:

  1. Divide la sorgente in sotto-stream tramite il Spliterator della sorgente. Array, ArrayList, IntStream.range e sorgenti simili si dividono agevolmente in O(1). LinkedList, Files.lines, Stream.iterate e Stream.generate si dividono male o rifiutano di dividersi.
  2. Esegue la catena intermedia di ciascun sotto-stream su un thread worker del pool comune.
  3. Unisce i risultati parziali — per reduce e collect, è a questo che serve il combiner.

forEach in uno stream parallelo chiama il tuo Consumer da più thread in modo concorrente e in ordine non specificato. forEachOrdered preserva l'ordine di incontro al costo della sincronizzazione. findFirst in parallelo è più costoso di findAny per la stessa ragione — deve coordinarsi per identificare la prima corrispondenza.

Il contratto — cosa deve rispettare la tua pipeline

Il parallelo produce una risposta corretta solo quando la pipeline rispetta tre regole. Il codice sequenziale che le ignora funziona comunque; il codice parallelo che le ignora produce silenziosamente risultati errati.

  1. Il riduttore deve essere associativo. f(f(a, b), c) == f(a, f(b, c)). +, *, max, min, unione di insiemi, concatenazione di liste sono tutti validi. Sottrazione, divisione, "prima corrispondenza" e "lista-append con ordine" non lo sono. Se passi un BinaryOperator non associativo a reduce o Collectors.reducing, la risposta dipende da come la JVM decide di dividere.
  2. La pipeline deve essere stateless. Le tue lambda non devono leggere né scrivere stato mutabile condiviso. Una lambda che cattura e modifica un ArrayList esterno, incrementa un int[] esterno o usa qualsiasi contatore non atomico andrà in race condition in parallelo.
  3. La pipeline deve essere priva di effetti collaterali. Il logging va bene; persistere attraverso un sink thread-safe va bene; tutto il resto è un bug in attesa che un worker lo interleave in modo diverso.

I collector inclusi in Collectors soddisfano 1–3 per costruzione (se usati come documentato). Sono le tue lambda all'interno di map, filter, reduce e peek quelle da controllare.

Quando il parallelo aiuta (e quando no)

Uno stream parallelo vince solo quando il lavoro per elemento è abbastanza grande da sovrastare il costo di coordinazione — suddivisione, scheduling, unione e il bookkeeping del framework. Un modello mentale approssimativo:

  • Sorgente grande + lavoro per elemento CPU-bound + merge economico + sorgente divisibile = il parallelo spesso vince. Elaborazione di immagini per pixel, parsing per record, hashing per file — casi classici.
  • Sorgente piccola = il sequenziale vince. Il wake-up del pool è più costoso dell'intera computazione.
  • Lavoro per elemento economico = il sequenziale vince. nums.stream().mapToInt(Integer::intValue).sum() è più veloce del suo equivalente parallelStream() fino a quando nums non raggiunge i milioni; con dimensioni ridotte l'overhead del framework domina.
  • I/O bloccante per elemento = gli stream paralleli sono lo strumento sbagliato. Il ForkJoinPool comune è dimensionato per il lavoro CPU; una chiamata I/O bloccante occupa un worker e affama tutti gli altri stream paralleli nella JVM (inclusi quelli delle librerie). Usa CompletableFuture con un executor limitato per il fan-out I/O.
  • Sorgente non divisibile = il parallelo ricade in modalità sequenziale o si divide male. Files.lines, Stream.iterate, Stream.generate e LinkedList.stream() sono i classici esempi di sorgenti con scarsa divisibilità; array, ArrayList e IntStream.range sono quelli con ottima divisibilità.

Il consiglio onesto: prediligi il sequenziale; passa al parallelo solo quando hai una ragione misurata, con numeri da jmh o da wall-clock alla mano.

Operazioni che si comportano diversamente in parallelo

Alcune operazioni il cui significato cambia quando la pipeline diventa parallela:

  • forEach — viene eseguito da più thread, in ordine non specificato. Se l'ordine è importante, usa forEachOrdered (che ha un costo di sincronizzazione).
  • findFirst — deve coordinarsi tra i worker per identificare la prima corrispondenza nell'ordine di incontro. Usa findAny se non ti importa quale corrispondenza vince.
  • limit / skip — ben definiti su stream ordinati, ma più costosi in parallelo perché la JVM deve rispettare l'ordine. Su uno stream parallelo in cui l'ordine non conta, stream.parallel().unordered().limit(n) è più economico.
  • distinct / sorted — devono coordinarsi tra i worker; il buffer che mantengono è condiviso.
  • reduce con l'overload a 3 argomenti usa il combiner per unire gli output dei worker. Con l'overload a 2 argomenti, la JVM usa l'identità due volte più l'accumulatore — stesso contratto, stessa regola di associatività.
  • collect — i Collectors sono progettati per essere sicuri in parallelo; il punto critico è che il container del risultato potrebbe essere una normale HashMap o ArrayList, e la raccolta parallela si coordina internamente per mantenerla sicura. I tuoi collector downstream devono rispettare il contratto.

La trappola dello stato condiviso, in forma concreta

Il bug più comune nel codice parallelo per principianti:

// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));

ArrayList.add non è thread-safe; i worker concorrenti perdono elementi, aggiungono duplicati, lanciano ArrayIndexOutOfBoundsException o corrompono silenziosamente la lista. La versione corretta esprime il risultato come output della pipeline, non come effetto collaterale:

List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();

toList(), come ogni altro collector e terminal che produce un valore, è progettato per l'uso parallelo. Nel momento in cui usi un forEach che muta una variabile esterna, hai abbandonato la strada sicura.

Se hai davvero bisogno di un sink thread-safe per forEach, usa ConcurrentLinkedQueue, AtomicLong, LongAdder o Collections.synchronizedList(...). Ma quasi sempre la risposta giusta è "non usare forEach per l'accumulo; lascia che la pipeline costruisca il risultato".

ForkJoinPool e perché è importante

Per impostazione predefinita, ogni stream parallelo nella JVM condivide il pool comune, dimensionato a Runtime.getRuntime().availableProcessors() - 1 thread worker. Ciò ha due conseguenze:

  • Uno stream parallelo di lunga durata monopolizza il pool. Qualsiasi altro stream parallelo — inclusi quelli all'interno delle librerie — si accoderà dietro di esso.
  • Uno stream parallelo che si blocca (I/O, lock, Thread.sleep) occupa un thread worker senza svolgere alcun lavoro, dimezzando la dimensione effettiva del pool mentre attende.

Puoi dedicare un pool privato a una pipeline isolata:

try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
    long total = pool.submit(() ->
        nums.parallelStream().mapToLong(n -> heavy(n)).sum()
    ).get();
}

Questa è la scelta giusta per una computazione intensiva di lunga durata che non vuoi condividere con il resto della JVM. È comunque la scelta sbagliata per I/O bloccante — in quel caso, passa ai virtual thread o a una catena CompletableFuture esplicita su un executor I/O limitato.

Un esempio pratico: speed-up parallelo, la trappola dello stato condiviso e un bug di associatività

Il programma seguente misura il tempo di esecuzione sequenziale vs. parallelo per una somma IntStream CPU-bound, dimostra la race condition dello stato condiviso con forEach, mostra la versione corretta basata su collector e confronta i riduttori associativi (Integer::sum) con quelli non associativi ((a, b) -> a - b) in parallelo.

java— editable, runs on the server

Cosa ricavare dall'esecuzione:

  • La somma parallela ha prodotto lo stesso risultato di quella sequenziale e (su qualsiasi macchina multi-core) ha terminato in una frazione del tempo reale. La chiamata heavy per elemento è CPU-bound e la sorgente (un int[]) si divide agevolmente — i due ingredienti di cui il parallelo ha bisogno.
  • Il forEach che mutava badSink ha perso elementi o è andato in crash. Non esiste una correzione che aggiunga un synchronized qui senza rendere la versione parallela più lenta di quella sequenziale. La soluzione è non scrivere forEach per l'accumulo — usa un collector o un terminal che produca il risultato.
  • Integer::sum è associativo; la riduzione parallela ha prodotto la stessa risposta di quella sequenziale. Il non-associativo (a, b) -> a - b ha prodotto risposte diverse in sequenziale vs. parallelo perché la JVM è libera di dividere e unire in qualsiasi ordine associativamente equivalente. Stesso codice, due risposte — il sintomo che ogni bug negli stream paralleli produce alla fine.
  • parallel().forEach(...) ha stampato 0..15 in un ordine non monotono; parallel().forEachOrdered(...) li ha stampati in ordine al costo della sincronizzazione tra worker. Se il tuo forEach tiene all'ordine, lo stai pagando.
  • Il ForkJoinPool(2) privato ha eseguito la pipeline contro un pool dedicato. Usalo quando hai un lavoro computazionale di lunga durata e non vuoi che condivida il pool comune con il resto della JVM. Non usarlo come pezza per l'I/O bloccante — quello è un problema diverso con uno strumento diverso.

Cosa viene dopo

Ora puoi ragionare su qualsiasi pipeline di stream: quando scriverne una, come costruirla, cosa è lazy, cosa fa short-circuit, cosa viene eseguito in parallelo in modo sicuro e cosa no. Un'astrazione centrale è ancora sul tavolo — quella che permette a una pipeline di esprimere "questo valore potrebbe essere assente" senza un singolo null. Il prossimo capitolo, Java Optional, tratta Optional<T> — cos'è, dove l'API degli stream lascia i propri punti sciolti e come usare map, flatMap, orElse e ifPresent per scrivere codice null-safe per costruzione.

Pratica

Pratica
`nums.parallelStream().reduce(0, (a, b) -> a - b)` restituisce una risposta diversa rispetto alla sua controparte con `stream()`. Perché?
`nums.parallelStream().reduce(0, (a, b) -> a - b)` restituisce una risposta diversa rispetto alla sua controparte con `stream()`. Perché?
Was this page helpful?