PYTHON MEBY

Pythonでスレッド間・プロセス間通信

この記事では、Pythonでスレッド間およびプロセス間通信を行うための様々な方法について解説します。threadingモジュール、multiprocessingモジュール、Queue、Pipe、そして共有メモリなどを用いた例を通して、それぞれのメリット・デメリットを理解し、適切な手法を選択できるよう支援します。

目次

スレッド間通信

スレッド間通信は、同一プロセス内にある複数のスレッド間でデータのやり取りを行うための方法です。Pythonでは、グローバル変数、Queueオブジェクトなどを利用できます。グローバル変数を使う場合は、適切なロック機構を用いてデータ競合を防ぐ必要があります。

import threading
import time

global_data = 0
lock = threading.Lock()

def worker(thread_id):
    global global_data
    for i in range(5):
        with lock:
            global_data += 1
        time.sleep(0.1)
        print(f"Thread {thread_id}: global_data = {global_data}")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()
print(f"Final global_data: {global_data}")

上記コードでは、threading.Lock()を用いてグローバル変数へのアクセスを制御し、データ競合を防いでいます。

プロセス間通信

プロセス間通信は、異なるプロセス間でデータのやり取りを行うための方法です。Pythonでは、multiprocessingモジュールが提供するQueue、Pipe、共有メモリなどを利用できます。

import multiprocessing

def worker(q):
    q.put([1, 2, 3])

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=worker, args=(q,))
    p.start()
    p.join()
    data = q.get()
    print(data)

この例では、multiprocessing.Queue()を用いて、親プロセスと子プロセス間でデータの送受信を行っています。

Queueを使った通信

Queueは、複数のプロセスやスレッド間でデータを安全にやり取りするための仕組みです。FIFO(先入れ先出し)のキューとして機能します。

import multiprocessing

def producer(q):
    for i in range(5):
        q.put(i)
        time.sleep(0.2)

def consumer(q):
    while True:
        try:
            item = q.get(True, 1)
            print(f"Consumed: {item}")
        except queue.Empty:
            break

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

この例では、producerプロセスがQueueにデータを投入し、consumerプロセスがQueueからデータを取り出しています。

Pipeを使った通信

Pipeは、双方向の通信チャネルを提供します。親プロセスと子プロセス間、あるいはスレッド間で通信するのに適しています。

import multiprocessing

def worker(conn):
    conn.send([4, 5, 6])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p = multiprocessing.Process(target=worker, args=(child_conn,))
    p.start()
    data = parent_conn.recv()
    p.join()
    print(data)

この例では、Pipeを使って親プロセスと子プロセス間でデータを送受信しています。

共有メモリを使った通信

共有メモリは、複数のプロセスが同じメモリ領域を共有することで、高速な通信を実現します。しかし、データ競合に注意する必要があります。

import multiprocessing
import time

value = multiprocessing.Value('i', 0)

def worker(val):
    for i in range(5):
        with val.get_lock():
            val.value += 1
        time.sleep(0.1)

if __name__ == '__main__':
    p = multiprocessing.Process(target=worker, args=(value,))
    p.start()
    p.join()
    print(value.value)

この例では、multiprocessing.Value()を使って共有メモリを定義し、複数のプロセスからアクセスしています。

Managerを使った通信

Managerオブジェクトは、複数のプロセス間で共有できる様々なオブジェクト(リスト、辞書など)を作成するための仕組みです。

import multiprocessing

def worker(d, lock):
    with lock:
      d['count'] += 1

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        d = manager.dict({'count': 0})
        lock = manager.Lock()
        p1 = multiprocessing.Process(target=worker, args=(d, lock))
        p2 = multiprocessing.Process(target=worker, args=(d, lock))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print(d['count'])

この例では、Managerオブジェクトを使って、複数のプロセス間で共有できる辞書を作成しています。

選択基準と注意点

適切な通信方法の選択は、アプリケーションの要件によって異なります。データ量、通信頻度、リアルタイム性の要件などを考慮する必要があります。また、共有メモリを使用する場合は、データ競合を防ぐための適切なロック機構を用いることが重要です。

関連記事