Framework Fork/Join di Java
Dividi il lavoro ricorsivamente e parallelizzalo con il framework Fork/Join: ForkJoinPool, RecursiveTask e work-stealing.
Un normale thread pool è ottimo per "molti task indipendenti." Non lo è per "un task grande che può essere suddiviso ricorsivamente in versioni più piccole di sé stesso." Per questa seconda forma — il lavoro divide-and-conquer — Java dispone di un executor specializzato: il ForkJoinPool. È il pool alla base di parallelStream, CompletableFuture.supplyAsync (quando non viene fornito un executor) e qualsiasi codice scritto con RecursiveTask/RecursiveAction.
Il meccanismo che rende speciale il ForkJoinPool è il work-stealing: ogni worker ha il proprio deque, e quando il proprio deque è vuoto ruba un task dal fondo del deque di un altro worker. Il risultato è un bilanciamento del carico automatico — i worker veloci aiutano quelli lenti senza alcuna coordinazione.
Quando usarlo
Fork/join è lo strumento giusto per:
- Divide-and-conquer ricorsivo. Quicksort, mergesort, visita di alberi, algoritmi numerici ricorsivi, moltiplicazione di matrici per dimezzamento.
- Lavoro CPU-bound parallelizzabile in parti approssimativamente uguali.
- Lavoro con granularità adattiva: dividi se il blocco è grande, esegui direttamente se è piccolo.
È lo strumento sbagliato per:
- Lavoro I/O-bound. Un worker bloccato non ruba — e la dimensione predefinita del pool è il numero di CPU. Blocca un worker e hai perso un core.
- Task indipendenti e non correlati. Un normale
ThreadPoolExecutorè più semplice e altrettanto veloce per questa forma. - Task che dipendono da una pianificazione esterna fissa. Usa
ScheduledExecutorService.
Un modello mentale utile: se useresti un parallelStream per farlo, fork/join è la stessa forma espressa direttamente. (Fork/join è arrivato in Java 7; parallelStream in Java 8 è stato costruito sopra di esso.)
Le tre classi
ForkJoinPool pool; // the executor
RecursiveTask<V>; // an abstract task returning V
RecursiveAction; // an abstract task returning nothingEstendi RecursiveTask o RecursiveAction, sovrascrivi compute(), decidi all'interno di compute() se suddividere o eseguire il lavoro direttamente, e chiama fork()/join() sui sotto-task.
class Sum extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000;
private final long[] data;
private final int lo, hi;
Sum(long[] data, int lo, int hi) {
this.data = data; this.lo = lo; this.hi = hi;
}
@Override
protected Long compute() {
int len = hi - lo;
if (len <= THRESHOLD) {
long s = 0;
for (int i = lo; i < hi; i++) s += data[i];
return s;
}
int mid = lo + len / 2;
Sum left = new Sum(data, lo, mid);
Sum right = new Sum(data, mid, hi);
left.fork(); // schedule left to run on another worker
long rightResult = right.compute(); // run right on this worker (avoid extra task)
long leftResult = left.join(); // wait for left
return leftResult + rightResult;
}
}
ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(new Sum(data, 0, data.length));La struttura — controlla la soglia → dividi → fork di una metà → calcola l'altra → join — è l'idioma canonico del fork/join. Il trucco del "calcola una metà qui invece di fare fork di entrambe" evita di creare un task inutile ed è un piccolo ma reale vantaggio.
La soglia è importante
La decisione più importante: quando smettere di suddividere. Una soglia troppo piccola crea migliaia di task per parti banali — l'overhead domina il lavoro. Troppo grande e non si riesce a sfruttare appieno i core — molti worker restano inattivi mentre uno elabora un blocco grande.
Regole pratiche:
- Il corpo del task dovrebbe richiedere almeno 10 microsecondi. Al di sotto di questo, l'overhead di gestione dei task è comparabile al lavoro stesso.
- Rendi la soglia una costante regolabile.
100,1000,10000sono valori tipici per array primitivi; il numero corretto dipende dal costo per elemento. - Per input molto piccoli, ricadi su un'implementazione puramente seriale. L'overhead di split-and-fork è sprecato su input che entrano in cache.
fork(), join(), invoke()
Le tre operazioni su un RecursiveTask:
| Metodo | Comportamento |
|---|---|
fork() | Pianifica il task nel pool corrente; ritorna immediatamente |
join() | Attende il task e restituisce il suo risultato (o rilancia l'eccezione) |
invoke() | Combinazione di fork + join per questo thread — sincrono |
compute() | Esegue il corpo direttamente sul thread chiamante (nessun fork) |
Nel pattern sopra, left.fork(); right.compute(); left.join(); fa la cosa giusta — fork di una metà a un altro worker, esecuzione dell'altra metà qui, poi attesa che il fork finisca.
Non dovresti scrivere left.fork(); right.fork(); left.join(); right.join();. Il lato destro viene forkato e il worker corrente aspetta — non c'è nessun thread di esecuzione disponibile per eseguire effettivamente right finché il worker non raggiunge join. La combinazione spreca lo slot temporale del worker corrente.
Il pool comune
ForkJoinPool.commonPool() è un pool condiviso a livello di JVM, dimensionato a Runtime.getRuntime().availableProcessors() - 1 per impostazione predefinita. Alimenta:
Stream.parallelStream()CompletableFuture.supplyAsync(supplier)(l'overload senza executor)Arrays.parallelSort()
Puoi configurare la dimensione del pool comune tramite la proprietà di sistema java.util.concurrent.ForkJoinPool.common.parallelism all'avvio della JVM. Non dovresti usare il pool comune per I/O — una singola chiamata bloccante occupa un worker condiviso dall'intera JVM.
Work-stealing illustrato
worker-1 deque: [t1 t2 t3 t4] (it forked these; t4 just got pushed)
worker-2 deque: [] (empty — workers steal)
worker-3 deque: [t10 t11] (still has its own)
worker-2 finds its deque empty; steals t1 from the BOTTOM of worker-1's deque
worker-1 keeps pulling its own tasks from the TOPIl deque a doppia estremità è il cuore del design: i worker inseriscono ed estraggono da un'estremità (LIFO — localizzazione di riferimento per i cache hit), i ladri prendono dall'altra estremità (FIFO — contesa minima con il proprietario). È per questo che fork/join funziona così bene: i worker raramente entrano in contesa sulle strutture dati degli altri anche sotto carico intenso.
Un esempio concreto: somma parallela vs. seriale
Il programma seguente somma un array di 10 milioni di elementi in due modi — ciclo seriale e ricorsione fork/join — e stampa il tempo di parete per ciascuno.
Cosa ricavare dall'esecuzione:
- La versione fork/join è risultata molte volte più veloce del ciclo seriale. Su una macchina con
Ncore il limite superiore è circaN×— il numero effettivo è stato inferiore perché la JVM, il GC e altri thread JVM richiedevano anch'essi CPU, e il lavoro alla soglia non è perfettamente bilanciato. Tuttavia, un'accelerazione sostanziale per poche righe di codice ricorsivo. - Entrambe le somme erano uguali. Questo è il controllo di correttezza partition-and-merge: ogni foglia ha sommato la sua slice non sovrapposta; il passo di combinazione (
l + r) le ha sommate; nessun doppio conteggio o data race perché ogni foglia ha scritto nella propria variabile locale. - La variante
SumTinycon soglia10era più lenta del ciclo seriale. Con 10M di elementi suddivisi fino a blocchi di 10, si creano circa 2M di task — e l'overhead di gestione dei task supera di gran lunga il lavoro di addizione effettivo. La soglia è una vera manopola da regolare; esegui benchmark su input rappresentativi. - Il pattern
left.fork(); long r = right.compute(); long l = left.join();usa un task in meno rispetto afork(); fork(); join(); join();. Il worker corrente ha tempo libero durante ilcompute()— usarlo per una delle metà risparmia un'intera allocazione di task. Questo è il piccolo ma cumulativo vantaggio su molti carichi di lavoro reali. ForkJoinPool.commonPool()è stato l'executor usato in questa demo. Per un'esecuzione una tantum, il pool comune va bene. Per un programma a lunga esecuzione che mescola lavoro fork/join con chiamateparallelsu stream e future asincroni, assegna al carico fork/join pesante il proprio pool — il pool comune è pensato per brevi raffiche, non per calcolo intensivo stabile.
Cosa c'è dopo
Il capitolo successivo, Collezioni Concorrenti Java, tratta le strutture dati progettate per essere utilizzate intensamente da più thread — ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue e il resto di java.util.concurrent.