Pythonでスレッドプール・プロセスプールを使う(concurrent.futures)
この記事では、Pythonのconcurrent.futuresモジュールを使用して、スレッドプールとプロセスプールによる並列処理を行う方法を解説します。ThreadPoolExecutorとProcessPoolExecutorの使い方、そしてそれぞれの利点と欠点について説明します。
目次
- スレッドプール (ThreadPoolExecutor)
- プロセスプール (ProcessPoolExecutor)
- Executorの使用方法
- サンプルコード:数値の二乗計算
- スレッドとプロセスの使い分け
- エラーハンドリング
スレッドプール (ThreadPoolExecutor)
ThreadPoolExecutorは、複数のスレッドを用いてタスクを並列実行するプールです。CPUバウンドなタスクには向いていませんが、I/Oバウンドなタスク(ネットワークアクセスやディスクI/Oなど)の処理に適しています。
import concurrent.futures
import time
def task(n):
time.sleep(1) # I/Oバウンドなタスクをシミュレート
return n * n
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
print(results)
max_workers引数で、同時に実行するスレッド数を指定します。as_completed()を使うことで、タスクが完了する順に結果を取得できます。
プロセスプール (ProcessPoolExecutor)
ProcessPoolExecutorは、複数のプロセスを用いてタスクを並列実行するプールです。CPUバウンドなタスクに適しており、CPUコア数を最大限に活用できます。ただし、プロセスの生成と終了にはオーバーヘッドがかかります。
import concurrent.futures
import time
def task(n):
time.sleep(1) # CPUバウンドなタスクをシミュレート
return n * n
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task, i) for i in range(10)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
print(results)
max_workers引数で、同時に実行するプロセス数を指定します。
Executorの使用方法
ThreadPoolExecutorとProcessPoolExecutorの使い方はほぼ同じです。submit()メソッドでタスクをプールに投入し、as_completed()メソッドで結果を取得します。with文を使うことで、プールのリソースを適切に解放できます。
サンプルコード:数値の二乗計算
以下のコードは、0から9までの数値を二乗するタスクを、スレッドプールとプロセスプールで実行する例です。
import concurrent.futures
import time
def square(n):
return n * n
numbers = list(range(10))
# スレッドプール
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(square, numbers))
print(f"スレッドプール: {results}, time: {time.time() - start_time}")
# プロセスプール
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(square, numbers))
print(f"プロセスプール: {results}, time: {time.time() - start_time}")
map()を使うことで、イテラブルなオブジェクトの各要素に対して関数を適用し、結果をイテレータとして取得できます。
スレッドとプロセスの使い分け
I/OバウンドなタスクにはThreadPoolExecutorを、CPUバウンドなタスクにはProcessPoolExecutorを使うのが一般的です。ただし、プロセスの生成にはオーバーヘッドがあるので、タスク数が少ない場合はThreadPoolExecutorの方が効率が良い場合があります。
エラーハンドリング
タスク実行中に例外が発生した場合、future.result()は例外を再送出します。try-exceptブロックで例外をキャッチする必要があります。
import concurrent.futures
def task(n):
if n == 5:
raise ValueError("nは5にできません")
return n * n
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
try:
result = future.result()
print(result)
except ValueError as e:
print(f"エラー: {e}")
例外処理を適切に行うことで、プログラムの堅牢性を高めることができます。