PYTHON MEBY

Pythonでスレッドプール・プロセスプールを使う(concurrent.futures)

この記事では、Pythonのconcurrent.futuresモジュールを使用して、スレッドプールとプロセスプールによる並列処理を行う方法を解説します。ThreadPoolExecutorとProcessPoolExecutorの使い方、そしてそれぞれの利点と欠点について説明します。

目次

スレッドプール (ThreadPoolExecutor)

ThreadPoolExecutorは、複数のスレッドを用いてタスクを並列実行するプールです。CPUバウンドなタスクには向いていませんが、I/Oバウンドなタスク(ネットワークアクセスやディスクI/Oなど)の処理に適しています。

import concurrent.futures
import time

def task(n):
    time.sleep(1)  # I/Oバウンドなタスクをシミュレート
    return n * n

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

print(results)

max_workers引数で、同時に実行するスレッド数を指定します。as_completed()を使うことで、タスクが完了する順に結果を取得できます。

プロセスプール (ProcessPoolExecutor)

ProcessPoolExecutorは、複数のプロセスを用いてタスクを並列実行するプールです。CPUバウンドなタスクに適しており、CPUコア数を最大限に活用できます。ただし、プロセスの生成と終了にはオーバーヘッドがかかります。

import concurrent.futures
import time

def task(n):
    time.sleep(1) # CPUバウンドなタスクをシミュレート
    return n * n

with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(task, i) for i in range(10)]
    results = [future.result() for future in concurrent.futures.as_completed(futures)]

print(results)

max_workers引数で、同時に実行するプロセス数を指定します。

Executorの使用方法

ThreadPoolExecutorとProcessPoolExecutorの使い方はほぼ同じです。submit()メソッドでタスクをプールに投入し、as_completed()メソッドで結果を取得します。with文を使うことで、プールのリソースを適切に解放できます。

サンプルコード:数値の二乗計算

以下のコードは、0から9までの数値を二乗するタスクを、スレッドプールとプロセスプールで実行する例です。

import concurrent.futures
import time

def square(n):
    return n * n

numbers = list(range(10))

# スレッドプール
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(square, numbers))
print(f"スレッドプール: {results}, time: {time.time() - start_time}")

# プロセスプール
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    results = list(executor.map(square, numbers))
print(f"プロセスプール: {results}, time: {time.time() - start_time}")

map()を使うことで、イテラブルなオブジェクトの各要素に対して関数を適用し、結果をイテレータとして取得できます。

スレッドとプロセスの使い分け

I/OバウンドなタスクにはThreadPoolExecutorを、CPUバウンドなタスクにはProcessPoolExecutorを使うのが一般的です。ただし、プロセスの生成にはオーバーヘッドがあるので、タスク数が少ない場合はThreadPoolExecutorの方が効率が良い場合があります。

エラーハンドリング

タスク実行中に例外が発生した場合、future.result()は例外を再送出します。try-exceptブロックで例外をキャッチする必要があります。

import concurrent.futures

def task(n):
    if n == 5:
        raise ValueError("nは5にできません")
    return n * n

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    try:
        result = future.result()
        print(result)
    except ValueError as e:
        print(f"エラー: {e}")

例外処理を適切に行うことで、プログラムの堅牢性を高めることができます。

関連記事