W3docs

Framework Fork/Join di Java

Dividi il lavoro ricorsivamente e parallelizzalo con il framework Fork/Join: ForkJoinPool, RecursiveTask e work-stealing.

Un normale thread pool è ottimo per "molti task indipendenti." Non lo è per "un task grande che può essere suddiviso ricorsivamente in versioni più piccole di sé stesso." Per questa seconda forma — il lavoro divide-and-conquer — Java dispone di un executor specializzato: il ForkJoinPool. È il pool alla base di parallelStream, CompletableFuture.supplyAsync (quando non viene fornito un executor) e qualsiasi codice scritto con RecursiveTask/RecursiveAction.

Il meccanismo che rende speciale il ForkJoinPool è il work-stealing: ogni worker ha il proprio deque, e quando il proprio deque è vuoto ruba un task dal fondo del deque di un altro worker. Il risultato è un bilanciamento del carico automatico — i worker veloci aiutano quelli lenti senza alcuna coordinazione.

Quando usarlo

Fork/join è lo strumento giusto per:

  • Divide-and-conquer ricorsivo. Quicksort, mergesort, visita di alberi, algoritmi numerici ricorsivi, moltiplicazione di matrici per dimezzamento.
  • Lavoro CPU-bound parallelizzabile in parti approssimativamente uguali.
  • Lavoro con granularità adattiva: dividi se il blocco è grande, esegui direttamente se è piccolo.

È lo strumento sbagliato per:

  • Lavoro I/O-bound. Un worker bloccato non ruba — e la dimensione predefinita del pool è il numero di CPU. Blocca un worker e hai perso un core.
  • Task indipendenti e non correlati. Un normale ThreadPoolExecutor è più semplice e altrettanto veloce per questa forma.
  • Task che dipendono da una pianificazione esterna fissa. Usa ScheduledExecutorService.

Un modello mentale utile: se useresti un parallelStream per farlo, fork/join è la stessa forma espressa direttamente. (Fork/join è arrivato in Java 7; parallelStream in Java 8 è stato costruito sopra di esso.)

Le tre classi

ForkJoinPool pool;                                    // the executor
RecursiveTask<V>;                                     // an abstract task returning V
RecursiveAction;                                      // an abstract task returning nothing

Estendi RecursiveTask o RecursiveAction, sovrascrivi compute(), decidi all'interno di compute() se suddividere o eseguire il lavoro direttamente, e chiama fork()/join() sui sotto-task.

class Sum extends RecursiveTask<Long> {
  private static final int THRESHOLD = 1000;
  private final long[] data;
  private final int lo, hi;

  Sum(long[] data, int lo, int hi) {
    this.data = data; this.lo = lo; this.hi = hi;
  }

  @Override
  protected Long compute() {
    int len = hi - lo;
    if (len <= THRESHOLD) {
      long s = 0;
      for (int i = lo; i < hi; i++) s += data[i];
      return s;
    }
    int mid = lo + len / 2;
    Sum left  = new Sum(data, lo, mid);
    Sum right = new Sum(data, mid, hi);
    left.fork();                                      // schedule left to run on another worker
    long rightResult = right.compute();               // run right on this worker (avoid extra task)
    long leftResult  = left.join();                   // wait for left
    return leftResult + rightResult;
  }
}

ForkJoinPool pool = new ForkJoinPool();
long total = pool.invoke(new Sum(data, 0, data.length));

La struttura — controlla la soglia → dividi → fork di una metà → calcola l'altra → join — è l'idioma canonico del fork/join. Il trucco del "calcola una metà qui invece di fare fork di entrambe" evita di creare un task inutile ed è un piccolo ma reale vantaggio.

La soglia è importante

La decisione più importante: quando smettere di suddividere. Una soglia troppo piccola crea migliaia di task per parti banali — l'overhead domina il lavoro. Troppo grande e non si riesce a sfruttare appieno i core — molti worker restano inattivi mentre uno elabora un blocco grande.

Regole pratiche:

  • Il corpo del task dovrebbe richiedere almeno 10 microsecondi. Al di sotto di questo, l'overhead di gestione dei task è comparabile al lavoro stesso.
  • Rendi la soglia una costante regolabile. 100, 1000, 10000 sono valori tipici per array primitivi; il numero corretto dipende dal costo per elemento.
  • Per input molto piccoli, ricadi su un'implementazione puramente seriale. L'overhead di split-and-fork è sprecato su input che entrano in cache.

fork(), join(), invoke()

Le tre operazioni su un RecursiveTask:

MetodoComportamento
fork()Pianifica il task nel pool corrente; ritorna immediatamente
join()Attende il task e restituisce il suo risultato (o rilancia l'eccezione)
invoke()Combinazione di fork + join per questo thread — sincrono
compute()Esegue il corpo direttamente sul thread chiamante (nessun fork)

Nel pattern sopra, left.fork(); right.compute(); left.join(); fa la cosa giusta — fork di una metà a un altro worker, esecuzione dell'altra metà qui, poi attesa che il fork finisca.

Non dovresti scrivere left.fork(); right.fork(); left.join(); right.join();. Il lato destro viene forkato e il worker corrente aspetta — non c'è nessun thread di esecuzione disponibile per eseguire effettivamente right finché il worker non raggiunge join. La combinazione spreca lo slot temporale del worker corrente.

Il pool comune

ForkJoinPool.commonPool() è un pool condiviso a livello di JVM, dimensionato a Runtime.getRuntime().availableProcessors() - 1 per impostazione predefinita. Alimenta:

  • Stream.parallelStream()
  • CompletableFuture.supplyAsync(supplier) (l'overload senza executor)
  • Arrays.parallelSort()

Puoi configurare la dimensione del pool comune tramite la proprietà di sistema java.util.concurrent.ForkJoinPool.common.parallelism all'avvio della JVM. Non dovresti usare il pool comune per I/O — una singola chiamata bloccante occupa un worker condiviso dall'intera JVM.

Work-stealing illustrato

worker-1 deque:  [t1 t2 t3 t4]            (it forked these; t4 just got pushed)
worker-2 deque:  []                       (empty — workers steal)
worker-3 deque:  [t10 t11]                (still has its own)

worker-2 finds its deque empty; steals t1 from the BOTTOM of worker-1's deque
worker-1 keeps pulling its own tasks from the TOP

Il deque a doppia estremità è il cuore del design: i worker inseriscono ed estraggono da un'estremità (LIFO — localizzazione di riferimento per i cache hit), i ladri prendono dall'altra estremità (FIFO — contesa minima con il proprietario). È per questo che fork/join funziona così bene: i worker raramente entrano in contesa sulle strutture dati degli altri anche sotto carico intenso.

Un esempio concreto: somma parallela vs. seriale

Il programma seguente somma un array di 10 milioni di elementi in due modi — ciclo seriale e ricorsione fork/join — e stampa il tempo di parete per ciascuno.

java— editable, runs on the server

Cosa ricavare dall'esecuzione:

  • La versione fork/join è risultata molte volte più veloce del ciclo seriale. Su una macchina con N core il limite superiore è circa — il numero effettivo è stato inferiore perché la JVM, il GC e altri thread JVM richiedevano anch'essi CPU, e il lavoro alla soglia non è perfettamente bilanciato. Tuttavia, un'accelerazione sostanziale per poche righe di codice ricorsivo.
  • Entrambe le somme erano uguali. Questo è il controllo di correttezza partition-and-merge: ogni foglia ha sommato la sua slice non sovrapposta; il passo di combinazione (l + r) le ha sommate; nessun doppio conteggio o data race perché ogni foglia ha scritto nella propria variabile locale.
  • La variante SumTiny con soglia 10 era più lenta del ciclo seriale. Con 10M di elementi suddivisi fino a blocchi di 10, si creano circa 2M di task — e l'overhead di gestione dei task supera di gran lunga il lavoro di addizione effettivo. La soglia è una vera manopola da regolare; esegui benchmark su input rappresentativi.
  • Il pattern left.fork(); long r = right.compute(); long l = left.join(); usa un task in meno rispetto a fork(); fork(); join(); join();. Il worker corrente ha tempo libero durante il compute() — usarlo per una delle metà risparmia un'intera allocazione di task. Questo è il piccolo ma cumulativo vantaggio su molti carichi di lavoro reali.
  • ForkJoinPool.commonPool() è stato l'executor usato in questa demo. Per un'esecuzione una tantum, il pool comune va bene. Per un programma a lunga esecuzione che mescola lavoro fork/join con chiamate parallel su stream e future asincroni, assegna al carico fork/join pesante il proprio pool — il pool comune è pensato per brevi raffiche, non per calcolo intensivo stabile.

Cosa c'è dopo

Il capitolo successivo, Collezioni Concorrenti Java, tratta le strutture dati progettate per essere utilizzate intensamente da più thread — ConcurrentHashMap, CopyOnWriteArrayList, BlockingQueue e il resto di java.util.concurrent.

Esercitazione

Pratica
All'interno di `compute()` di un `RecursiveTask`, hai due sotto-task `left` e `right`. Quale pattern di chiamata è l'idioma canonico del fork/join?
All'interno di `compute()` di un `RecursiveTask`, hai due sotto-task `left` e `right`. Quale pattern di chiamata è l'idioma canonico del fork/join?
Was this page helpful?