Python Threading
Impara il threading in Python: crea thread, sincronizza con lock, usa code, ThreadPoolExecutor e comprendi quando il GIL è rilevante.
Il modulo threading di Python consente di eseguire più attività nello stesso processo contemporaneamente. Ogni attività viene eseguita nel proprio thread — un'unità di esecuzione leggera che condivide lo spazio di memoria del processo. Il threading è lo strumento giusto quando il programma trascorre la maggior parte del tempo in attesa (lettura di un file, richiesta HTTP, interrogazione di un database) e si vuole svolgere lavoro utile durante quell'attesa invece di bloccarsi.
Questo capitolo tratta:
- Creazione e avvio di thread con
threading.Thread - Attesa del completamento dei thread con
join - Thread daemon e attività in background
- Prevenzione delle race condition con
Lockewith - Coordinamento dei thread con
EventeSemaphore - Comunicazione thread-safe usando
queue.Queue ThreadPoolExecutorper pool di thread gestiti- Il Global Interpreter Lock (GIL) e perché i thread non accelerano il codice CPU-bound
- Quando scegliere threading vs. asyncio
Creazione e avvio di un thread
Importa threading e crea un oggetto Thread, passando la funzione da eseguire come target. Chiama .start() per avviare il thread:
import threading
import time
def greet(name):
time.sleep(0.5) # simulate some work
print(f'Hello, {name}!')
t = threading.Thread(target=greet, args=('Alice',))
t.start()
print('Thread started — main continues running')
t.join() # wait for the thread to finish
print('Thread finished')
# Thread started — main continues running
# Hello, Alice!
# Thread finishedPunti chiave:
argsè una tupla di argomenti posizionali passati atarget. Usakwargsper gli argomenti keyword.- Senza
.join(), il thread principale può terminare prima che il thread avviato finisca. .start()ritorna immediatamente; il nuovo thread viene eseguito in modo concorrente.
Passaggio di argomenti keyword
import threading
def connect(host, port=80):
print(f'Connecting to {host}:{port}')
t = threading.Thread(target=connect, kwargs={'host': 'example.com', 'port': 443})
t.start()
t.join()
# Connecting to example.com:443Esecuzione di più thread contemporaneamente
Il vero vantaggio del threading è eseguire più attività in parallelo. Avvia tutti i thread prima, poi attendili tutti:
import threading
import time
def download(url):
time.sleep(1) # simulate a 1-second network request
print(f'Downloaded: {url}')
urls = [
'https://example.com/data1',
'https://example.com/data2',
'https://example.com/data3',
]
start = time.perf_counter()
threads = [threading.Thread(target=download, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
print(f'All downloads finished in {elapsed:.1f}s')
# Downloaded: https://example.com/data1
# Downloaded: https://example.com/data2
# Downloaded: https://example.com/data3
# All downloads finished in 1.0sSenza thread questo richiederebbe 3 secondi (sequenziale). Con tre thread ci vuole circa 1 secondo perché le attese si sovrappongono.
Sottoclassi di Thread
Per logica più complessa, estendi threading.Thread e sovrascrivi run(). Memorizza i risultati come attributi dell'istanza in modo che il codice chiamante possa leggerli dopo join():
import threading
import time
class DownloadThread(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.result = None
def run(self):
time.sleep(0.5) # simulate download
self.result = f'Data from {self.url}'
threads = [DownloadThread(f'https://example.com/page{i}') for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
for t in threads:
print(t.result)
# Data from https://example.com/page0
# Data from https://example.com/page1
# Data from https://example.com/page2Thread daemon
Un thread daemon è un thread in background che l'interprete termina automaticamente quando tutti i thread non-daemon sono usciti. Contrassegna un thread come daemon passando daemon=True (oppure impostando t.daemon = True prima di .start()):
import threading
import time
def heartbeat():
while True:
print('♥ still running')
time.sleep(1)
t = threading.Thread(target=heartbeat, daemon=True)
t.start()
time.sleep(2.5)
print('Main thread exiting — daemon will be killed')
# ♥ still running
# ♥ still running
# Main thread exiting — daemon will be killedUsa i thread daemon per monitoraggio in background o attività di logging che non devono impedire l'uscita del programma. Non usarli mai per attività che devono completarsi in modo pulito (scritture su file, commit su database) — vengono terminati senza alcuna pulizia.
Nomi dei thread e introspezione
Ogni thread ha un nome. Puoi impostarlo esplicitamente o lasciare che Python ne assegni uno automaticamente. Usa threading.current_thread() per ispezionare il thread in esecuzione e threading.active_count() per contare i thread attivi:
import threading
def worker():
t = threading.current_thread()
print(f'Running in thread: {t.name}')
t = threading.Thread(target=worker, name='WorkerThread-1')
t.start()
t.join()
print(f'Active threads: {threading.active_count()}')
# Running in thread: WorkerThread-1
# Active threads: 1Sincronizzazione: prevenire le race condition
I thread condividono la memoria del processo. Quando due thread leggono e scrivono la stessa variabile contemporaneamente, il risultato è una race condition — un comportamento non deterministico difficile da riprodurre o fare il debug.
Il seguente esempio senza lock produce un conteggio finale imprevedibile perché gli incrementi di thread diversi possono sovrapporsi:
import threading
counter = 0
def unsafe_increment():
global counter
for _ in range(100_000):
counter += 1 # read-modify-write: not atomic!
threads = [threading.Thread(target=unsafe_increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
# counter is somewhere between 100000 and 500000 — unpredictable
print('Final counter:', counter)Lock
Un threading.Lock garantisce che solo un thread esegua la sezione protetta alla volta. Usalo come context manager con with in modo che il lock venga sempre rilasciato, anche se viene sollevata un'eccezione:
import threading
counter = 0
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(100_000):
with lock: # acquire before read-modify-write
counter += 1 # now only one thread at a time can run this
threads = [threading.Thread(target=safe_increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print('Final counter:', counter) # always 500000RLock (lock rientrante)
Se un thread deve acquisire lo stesso lock due volte (ad esempio, un metodo chiama un altro metodo che acquisisce anche il lock), usa threading.RLock. Permette allo stesso thread di riacquisire il lock senza causare deadlock:
import threading
lock = threading.RLock()
def outer():
with lock:
print('Outer acquired')
inner() # inner also acquires the same lock
def inner():
with lock: # works because RLock counts acquisitions
print('Inner acquired')
t = threading.Thread(target=outer)
t.start()
t.join()
# Outer acquired
# Inner acquiredCoordinamento dei thread: Event e Semaphore
Event
threading.Event è un semplice segnale. Un thread chiama .set() per segnalare; gli altri thread chiamano .wait() per bloccarsi finché il segnale non arriva:
import threading
import time
ready = threading.Event()
def worker():
print('Worker: waiting for signal...')
ready.wait() # blocks here until ready.set() is called
print('Worker: signal received, starting work')
t = threading.Thread(target=worker)
t.start()
time.sleep(0.5)
print('Main: sending signal')
ready.set()
t.join()
# Worker: waiting for signal...
# Main: sending signal
# Worker: signal received, starting workUsa un Event per coordinare l'ordine di avvio — ad esempio, per ritardare i thread worker finché non viene stabilita una connessione al database.
Semaphore
Un threading.Semaphore limita il numero di thread che possono trovarsi contemporaneamente in una sezione. Questo è utile per limitare l'accesso a una risorsa condivisa come un pool di connessioni:
import threading
import time
# Allow at most 2 threads to enter the critical section at once
semaphore = threading.Semaphore(2)
def use_connection(name):
with semaphore:
print(f'{name}: using connection')
time.sleep(0.5)
print(f'{name}: releasing connection')
threads = [threading.Thread(target=use_connection, args=(f'T{i}',)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
# T0: using connection
# T1: using connection <- only 2 at a time
# T0: releasing connection
# T2: using connection
# T1: releasing connection
# T3: using connection
# T2: releasing connection
# T3: releasing connectionDati locali al thread
threading.local() crea un oggetto che mantiene valori separati per ogni thread. Questo è utile per cache per-thread o cursori di database:
import threading
local_data = threading.local()
def set_user(name):
local_data.user = name # each thread writes its own copy
print(f'{threading.current_thread().name}: user = {local_data.user}')
threads = [
threading.Thread(target=set_user, args=(f'user{i}',), name=f'Thread-{i}')
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()
# Thread-0: user = user0
# Thread-1: user = user1
# Thread-2: user = user2Leggere local_data.user in un thread dove non è mai stato impostato solleva AttributeError, come qualsiasi altro accesso ad un attributo.
Code thread-safe
La classe queue.Queue (dal modulo queue della libreria standard, non da asyncio) è una FIFO thread-safe. I thread possono eseguire put e get di elementi senza un lock — tutta la sincronizzazione è gestita internamente.
Il pattern classico è produttore-consumatore: uno o più thread produttori generano lavoro, i thread consumatori lo elaborano:
import threading
import queue
import time
q = queue.Queue(maxsize=5)
def producer():
for i in range(1, 5):
q.put(f'item-{i}')
print(f'Produced item-{i}')
time.sleep(0.05)
def consumer():
while True:
item = q.get()
if item is None: # sentinel: stop when None is received
break
print(f'Consumed {item}')
q.task_done()
prod = threading.Thread(target=producer)
cons = threading.Thread(target=consumer)
cons.start()
prod.start()
prod.join()
q.put(None) # signal consumer to stop
cons.join()
# Produced item-1
# Consumed item-1
# Produced item-2
# Consumed item-2
# Produced item-3
# Consumed item-3
# Produced item-4
# Consumed item-4queue.Queue offre anche task_done() e join() per tracciare quando tutti gli elementi in coda sono stati elaborati, e queue.LifoQueue / queue.PriorityQueue per ordinamenti alternativi.
ThreadPoolExecutor: pool di thread gestiti
Creare un nuovo oggetto Thread per ogni attività è dispendioso quando si hanno molte attività di breve durata. concurrent.futures.ThreadPoolExecutor gestisce un pool di thread worker riutilizzabili e restituisce oggetti Future per ogni attività inviata:
import concurrent.futures
import time
def fetch_url(url):
time.sleep(0.5) # simulate network I/O
return f'Response from {url}'
urls = [
'https://api.example.com/users',
'https://api.example.com/posts',
'https://api.example.com/comments',
]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# submit all tasks and get Future objects
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in concurrent.futures.as_completed(futures):
url = futures[future]
print(future.result())
# Response from https://api.example.com/users (order may vary)
# Response from https://api.example.com/comments
# Response from https://api.example.com/postsexecutor.map(fn, iterable) è una forma più breve quando non hai bisogno di oggetti Future individuali:
import concurrent.futures
import time
def square(n):
time.sleep(0.01)
return n * n
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(square, range(10)))
print(results)
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]executor.map preserva l'ordine dell'input nel suo output, a differenza di as_completed che restituisce nell'ordine di completamento.
Il Global Interpreter Lock (GIL)
CPython (l'interprete Python standard) ha un Global Interpreter Lock — un mutex che permette a un solo thread alla volta di eseguire bytecode Python. Ciò significa che i thread in CPython non possono eseguire codice Python in vero parallelo su più core CPU.
L'implicazione pratica:
- Attività I/O-bound: i thread accelerano genuinamente il programma. Mentre un thread attende una risposta di rete, il GIL viene rilasciato e un altro thread viene eseguito. Gli esempi precedenti dimostrano tutti questo comportamento.
- Attività CPU-bound: i thread non accelerano le cose e potrebbero essere addirittura leggermente più lenti a causa dell'overhead del cambio di contesto.
import threading
import time
def cpu_bound(n):
total = 0
for i in range(n):
total += i
return total
# Sequential
start = time.perf_counter()
cpu_bound(5_000_000)
cpu_bound(5_000_000)
single = time.perf_counter() - start
# Two threads — GIL prevents true parallelism
start = time.perf_counter()
t1 = threading.Thread(target=cpu_bound, args=(5_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(5_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
threaded = time.perf_counter() - start
print(f'Single-threaded: {single:.2f}s')
print(f'Two threads: {threaded:.2f}s')
# Two threads are NOT faster (similar elapsed time)Per il vero parallelismo CPU in Python, usa multiprocessing o concurrent.futures.ProcessPoolExecutor — ogni processo ha il proprio GIL.
Threading vs. asyncio
Sia threading che asyncio rendono più veloci i programmi I/O-bound, ma funzionano in modo diverso:
threading | asyncio | |
|---|---|---|
| Modello di concorrenza | Preemptivo — il sistema operativo cambia i thread | Cooperativo — le coroutine cedono con await |
| Ideale per | Librerie di terze parti bloccanti | Librerie con supporto async (aiohttp, asyncpg) |
| Stato condiviso | Richiede lock espliciti | Sicuro all'interno di un singolo event loop |
| Overhead | Un thread OS per attività | Molto basso — migliaia di coroutine su un thread |
| Curva di apprendimento | Familiare (codice in stile sincrono) | Richiede async/await ovunque |
Regola pratica: se stai usando una libreria che ha una versione compatibile con async (ad esempio, aiohttp invece di requests), usa asyncio. Se sei vincolato a librerie bloccanti sincrone, usa il threading. Per lavoro CPU-bound, usa multiprocessing.
Errori comuni
Avviare un thread due volte. Chiamare .start() sullo stesso oggetto Thread più di una volta solleva RuntimeError. Crea una nuova istanza Thread per ogni esecuzione.
Dimenticare il join. Un thread senza join potrebbe essere ancora in esecuzione quando il programma termina. Chiama sempre join sui thread il cui completamento è importante, oppure rendili daemon se sono davvero fire-and-forget.
Tenere un lock troppo a lungo. Bloccare una sezione di codice ampia vanifica lo scopo della concorrenza. Mantieni le sezioni bloccate il più brevi possibile — proteggi solo l'operazione effettiva di lettura-modifica-scrittura.
Deadlock. Un deadlock si verifica quando due thread mantengono ciascuno un lock che l'altro sta aspettando. Prevenilo acquisendo sempre più lock nello stesso ordine in tutti i thread.
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
# DEADLOCK: Thread 1 holds lock_a, waits for lock_b
# Thread 2 holds lock_b, waits for lock_a
# FIX: always acquire locks in the same order (lock_a then lock_b) in every threadModificare una lista mentre si itera in un altro thread. Avvolgi tutti gli accessi (lettura e scrittura) alle collezioni condivise con un lock per evitare RuntimeError: list changed size during iteration.
Riepilogo rapido
| Strumento | Scopo |
|---|---|
threading.Thread(target=fn, args=(...)) | Crea un nuovo thread |
t.start() | Avvia il thread |
t.join() | Attende il completamento del thread |
t.daemon = True | Contrassegna come thread in background (terminato all'uscita) |
threading.Lock() | Esclusione reciproca — un solo thread alla volta |
threading.RLock() | Lock rientrante — lo stesso thread può acquisirlo più volte |
threading.Event() | Segnale monouso tra thread |
threading.Semaphore(n) | Limita a n thread concorrenti in una sezione |
threading.local() | Storage per-thread |
queue.Queue | FIFO thread-safe per pattern produttore-consumatore |
ThreadPoolExecutor(max_workers=n) | Pool gestito di thread worker riutilizzabili |