Java Parallel Streams
Elabora stream Java in parallelo per maggiori prestazioni: quando parallelStream aiuta e quando peggiora le cose.
Uno stream parallelo è la stessa pipeline di stream che hai già scritto, ma la JVM può suddividere la sorgente in blocchi ed elaborarli su più thread. La modifica nel codice è minima:
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^oppure:
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();La forma della pipeline, le operazioni, il risultato — tutto invariato. Ciò che cambia è chi la esegue: invece di un singolo thread che scorre la sorgente, diversi worker del ForkJoinPool comune (uno per core CPU, meno uno) si dividono il lavoro e un coordinatore unisce i risultati parziali. Quando il lavoro per elemento è abbastanza pesante e la sorgente si divide agevolmente, la pipeline termina in circa tempo reale / core. Quando non è così, il parallelo è più lento del sequenziale — e a volte scorretto. Questo capitolo spiega come distinguere i due casi.
Cosa fa realmente "parallel"
Uno stream sequenziale elabora un elemento alla volta attraverso la pipeline, poi il successivo. Uno stream parallelo:
- Divide la sorgente in sotto-stream tramite il
Spliteratordella sorgente. Array,ArrayList,IntStream.rangee sorgenti simili si dividono agevolmente in O(1).LinkedList,Files.lines,Stream.iterateeStream.generatesi dividono male o rifiutano di dividersi. - Esegue la catena intermedia di ciascun sotto-stream su un thread worker del pool comune.
- Unisce i risultati parziali — per
reduceecollect, è a questo che serve ilcombiner.
forEach in uno stream parallelo chiama il tuo Consumer da più thread in modo concorrente e in ordine non specificato. forEachOrdered preserva l'ordine di incontro al costo della sincronizzazione. findFirst in parallelo è più costoso di findAny per la stessa ragione — deve coordinarsi per identificare la prima corrispondenza.
Il contratto — cosa deve rispettare la tua pipeline
Il parallelo produce una risposta corretta solo quando la pipeline rispetta tre regole. Il codice sequenziale che le ignora funziona comunque; il codice parallelo che le ignora produce silenziosamente risultati errati.
- Il riduttore deve essere associativo.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, unione di insiemi, concatenazione di liste sono tutti validi. Sottrazione, divisione, "prima corrispondenza" e "lista-append con ordine" non lo sono. Se passi unBinaryOperatornon associativo areduceoCollectors.reducing, la risposta dipende da come la JVM decide di dividere. - La pipeline deve essere stateless. Le tue lambda non devono leggere né scrivere stato mutabile condiviso. Una lambda che cattura e modifica un
ArrayListesterno, incrementa unint[]esterno o usa qualsiasi contatore non atomico andrà in race condition in parallelo. - La pipeline deve essere priva di effetti collaterali. Il logging va bene; persistere attraverso un sink thread-safe va bene; tutto il resto è un bug in attesa che un worker lo interleave in modo diverso.
I collector inclusi in Collectors soddisfano 1–3 per costruzione (se usati come documentato). Sono le tue lambda all'interno di map, filter, reduce e peek quelle da controllare.
Quando il parallelo aiuta (e quando no)
Uno stream parallelo vince solo quando il lavoro per elemento è abbastanza grande da sovrastare il costo di coordinazione — suddivisione, scheduling, unione e il bookkeeping del framework. Un modello mentale approssimativo:
- Sorgente grande + lavoro per elemento CPU-bound + merge economico + sorgente divisibile = il parallelo spesso vince. Elaborazione di immagini per pixel, parsing per record, hashing per file — casi classici.
- Sorgente piccola = il sequenziale vince. Il wake-up del pool è più costoso dell'intera computazione.
- Lavoro per elemento economico = il sequenziale vince.
nums.stream().mapToInt(Integer::intValue).sum()è più veloce del suo equivalenteparallelStream()fino a quandonumsnon raggiunge i milioni; con dimensioni ridotte l'overhead del framework domina. - I/O bloccante per elemento = gli stream paralleli sono lo strumento sbagliato. Il
ForkJoinPoolcomune è dimensionato per il lavoro CPU; una chiamata I/O bloccante occupa un worker e affama tutti gli altri stream paralleli nella JVM (inclusi quelli delle librerie). UsaCompletableFuturecon un executor limitato per il fan-out I/O. - Sorgente non divisibile = il parallelo ricade in modalità sequenziale o si divide male.
Files.lines,Stream.iterate,Stream.generateeLinkedList.stream()sono i classici esempi di sorgenti con scarsa divisibilità; array,ArrayListeIntStream.rangesono quelli con ottima divisibilità.
Il consiglio onesto: prediligi il sequenziale; passa al parallelo solo quando hai una ragione misurata, con numeri da jmh o da wall-clock alla mano.
Operazioni che si comportano diversamente in parallelo
Alcune operazioni il cui significato cambia quando la pipeline diventa parallela:
forEach— viene eseguito da più thread, in ordine non specificato. Se l'ordine è importante, usaforEachOrdered(che ha un costo di sincronizzazione).findFirst— deve coordinarsi tra i worker per identificare la prima corrispondenza nell'ordine di incontro. UsafindAnyse non ti importa quale corrispondenza vince.limit/skip— ben definiti su stream ordinati, ma più costosi in parallelo perché la JVM deve rispettare l'ordine. Su uno stream parallelo in cui l'ordine non conta,stream.parallel().unordered().limit(n)è più economico.distinct/sorted— devono coordinarsi tra i worker; il buffer che mantengono è condiviso.reducecon l'overload a 3 argomenti usa ilcombinerper unire gli output dei worker. Con l'overload a 2 argomenti, la JVM usa l'identità due volte più l'accumulatore — stesso contratto, stessa regola di associatività.collect— iCollectorssono progettati per essere sicuri in parallelo; il punto critico è che il container del risultato potrebbe essere una normaleHashMapoArrayList, e la raccolta parallela si coordina internamente per mantenerla sicura. I tuoi collector downstream devono rispettare il contratto.
La trappola dello stato condiviso, in forma concreta
Il bug più comune nel codice parallelo per principianti:
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add non è thread-safe; i worker concorrenti perdono elementi, aggiungono duplicati, lanciano ArrayIndexOutOfBoundsException o corrompono silenziosamente la lista. La versione corretta esprime il risultato come output della pipeline, non come effetto collaterale:
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList(), come ogni altro collector e terminal che produce un valore, è progettato per l'uso parallelo. Nel momento in cui usi un forEach che muta una variabile esterna, hai abbandonato la strada sicura.
Se hai davvero bisogno di un sink thread-safe per forEach, usa ConcurrentLinkedQueue, AtomicLong, LongAdder o Collections.synchronizedList(...). Ma quasi sempre la risposta giusta è "non usare forEach per l'accumulo; lascia che la pipeline costruisca il risultato".
ForkJoinPool e perché è importante
Per impostazione predefinita, ogni stream parallelo nella JVM condivide il pool comune, dimensionato a Runtime.getRuntime().availableProcessors() - 1 thread worker. Ciò ha due conseguenze:
- Uno stream parallelo di lunga durata monopolizza il pool. Qualsiasi altro stream parallelo — inclusi quelli all'interno delle librerie — si accoderà dietro di esso.
- Uno stream parallelo che si blocca (I/O, lock,
Thread.sleep) occupa un thread worker senza svolgere alcun lavoro, dimezzando la dimensione effettiva del pool mentre attende.
Puoi dedicare un pool privato a una pipeline isolata:
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}Questa è la scelta giusta per una computazione intensiva di lunga durata che non vuoi condividere con il resto della JVM. È comunque la scelta sbagliata per I/O bloccante — in quel caso, passa ai virtual thread o a una catena CompletableFuture esplicita su un executor I/O limitato.
Un esempio pratico: speed-up parallelo, la trappola dello stato condiviso e un bug di associatività
Il programma seguente misura il tempo di esecuzione sequenziale vs. parallelo per una somma IntStream CPU-bound, dimostra la race condition dello stato condiviso con forEach, mostra la versione corretta basata su collector e confronta i riduttori associativi (Integer::sum) con quelli non associativi ((a, b) -> a - b) in parallelo.
Cosa ricavare dall'esecuzione:
- La somma parallela ha prodotto lo stesso risultato di quella sequenziale e (su qualsiasi macchina multi-core) ha terminato in una frazione del tempo reale. La chiamata
heavyper elemento è CPU-bound e la sorgente (unint[]) si divide agevolmente — i due ingredienti di cui il parallelo ha bisogno. - Il
forEachche mutavabadSinkha perso elementi o è andato in crash. Non esiste una correzione che aggiunga unsynchronizedqui senza rendere la versione parallela più lenta di quella sequenziale. La soluzione è non scrivereforEachper l'accumulo — usa un collector o un terminal che produca il risultato. Integer::sumè associativo; la riduzione parallela ha prodotto la stessa risposta di quella sequenziale. Il non-associativo(a, b) -> a - bha prodotto risposte diverse in sequenziale vs. parallelo perché la JVM è libera di dividere e unire in qualsiasi ordine associativamente equivalente. Stesso codice, due risposte — il sintomo che ogni bug negli stream paralleli produce alla fine.parallel().forEach(...)ha stampato0..15in un ordine non monotono;parallel().forEachOrdered(...)li ha stampati in ordine al costo della sincronizzazione tra worker. Se il tuoforEachtiene all'ordine, lo stai pagando.- Il
ForkJoinPool(2)privato ha eseguito la pipeline contro un pool dedicato. Usalo quando hai un lavoro computazionale di lunga durata e non vuoi che condivida il pool comune con il resto della JVM. Non usarlo come pezza per l'I/O bloccante — quello è un problema diverso con uno strumento diverso.
Cosa viene dopo
Ora puoi ragionare su qualsiasi pipeline di stream: quando scriverne una, come costruirla, cosa è lazy, cosa fa short-circuit, cosa viene eseguito in parallelo in modo sicuro e cosa no. Un'astrazione centrale è ancora sul tavolo — quella che permette a una pipeline di esprimere "questo valore potrebbe essere assente" senza un singolo null. Il prossimo capitolo, Java Optional, tratta Optional<T> — cos'è, dove l'API degli stream lascia i propri punti sciolti e come usare map, flatMap, orElse e ifPresent per scrivere codice null-safe per costruzione.