본문 바로가기

Study Information Technology

Python의 multiprocessing 모듈로 병렬 처리하기

728x90
반응형

Python의 multiprocessing 모듈로 병렬 처리하기

Overview

Python의 multiprocessing 모듈은 다중 코어 프로세서를 효과적으로 활용하여 병렬 처리를 가능하게 합니다. 이는 특히 CPU 집약적인 작업에서 성능을 크게 향상시킬 수 있습니다. 이 모듈은 프로세스 기반의 병렬 처리를 제공하며, 스레드 기반의 병렬 처리와는 다른 장점과 단점을 가지고 있습니다. 여기에서는 multiprocessing 모듈의 주요 구성 요소와 함께 기본적인 사용법과 일반적인 오류 처리 방법을 상세히 설명하겠습니다.


multiprocessing 모듈의 기본 구성 요소

1. Process 클래스

Process 클래스는 새로운 프로세스를 생성하고 실행할 수 있는 가장 기본적인 방법을 제공합니다. 각 프로세스는 독립적인 메모리 공간을 가지며, 서로의 메모리 공간에 직접 접근할 수 없습니다.

예제 코드:

from multiprocessing import Process

def worker(num):
""" 프로세스에서 실행될 작업 """
print(f'Worker: {num}')

if __name__ == '__main__':
processes = []
for i in range(5):
p = Process(target=worker, args=(i,))
p.start()
processes.append(p)

for p in processes:
p.join()

설명:

  • Process 객체를 생성할 때 target 인수는 실행할 함수(또는 메서드)를 지정합니다.
  • args는 함수에 전달할 인수를 튜플 형태로 전달합니다.
  • start() 메서드는 프로세스를 시작합니다.
  • join() 메서드는 프로세스가 종료될 때까지 기다립니다.

2. QueuePipe

QueuePipe는 프로세스 간의 통신을 위해 사용됩니다. 이들은 데이터를 안전하게 공유할 수 있는 방법을 제공합니다.

Queue 예제 코드:

from multiprocessing import Process, Queue

def worker(queue):
""" 큐에 값을 추가하는 작업 """
queue.put('Hello from worker')

if __name__ == '__main__':
queue = Queue()
p = Process(target=worker, args=(queue,))
p.start()
p.join()

# 큐에서 값 가져오기
print(queue.get())

Pipe 예제 코드:

from multiprocessing import Process, Pipe

def worker(conn):
""" 파이프를 통해 메시지를 보내는 작업 """
conn.send('Hello from worker')
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
p.join()

# 파이프에서 값 가져오기
print(parent_conn.recv())

설명:

  • Queue는 FIFO(First In First Out) 구조를 가지며, 여러 프로세스가 동시에 접근할 수 있습니다.
  • Pipe는 두 프로세스 간의 단방향 통신을 지원합니다.

3. Pool 클래스

Pool 클래스는 다수의 프로세스를 동시에 관리하며, 작업을 병렬로 처리할 수 있게 합니다. Pool은 주로 작업의 수가 많고 병렬로 처리할 수 있는 경우 유용합니다.

예제 코드:

from multiprocessing import Pool

def square(n):
""" 숫자를 제곱하는 함수 """
return n * n

if __name__ == '__main__':
with Pool(4) as p:
results = p.map(square, [1, 2, 3, 4, 5])
print(results)

설명:

  • Pool 객체를 생성할 때 프로세스의 수를 지정할 수 있습니다.
  • map() 메서드는 리스트의 각 항목에 대해 지정된 함수를 병렬로 적용합니다.

4. LockSemaphore

LockSemaphore는 다중 프로세스 환경에서의 데이터 경쟁을 방지하기 위한 동기화 도구입니다.

Lock 예제 코드:

from multiprocessing import Process, Lock

def worker(lock):
with lock:
print('Lock acquired by:', os.getpid())

if __name__ == '__main__':
lock = Lock()
processes = [Process(target=worker, args=(lock,)) for _ in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()

Semaphore 예제 코드:

from multiprocessing import Process, Semaphore

def worker(sem):
sem.acquire()
print('Semaphore acquired by:', os.getpid())
sem.release()

if __name__ == '__main__':
sem = Semaphore(2)
processes = [Process(target=worker, args=(sem,)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()

설명:

  • Lock은 동시에 하나의 프로세스만 접근할 수 있도록 합니다.
  • Semaphore는 일정 수의 프로세스만 동시에 접근할 수 있도록 제어합니다.

일반적인 오류와 해결 방법

1. RuntimeError: context has already been set

원인:
이 오류는 multiprocessing 모듈의 set_start_method() 함수 호출이 불가능한 상황에서 발생할 수 있습니다.

해결 방법:
이 오류는 Python의 멀티프로세싱 시작 방법을 잘못 설정했을 때 발생합니다. 보통 이는 if __name__ == '__main__': 블록 내에서 모든 멀티프로세싱 관련 코드가 실행되도록 하면 해결할 수 있습니다.

2. EOFError: Ran out of input

원인:
이 오류는 프로세스 간의 통신에서 데이터가 예상보다 적게 전송되었을 때 발생할 수 있습니다.

해결 방법:
데이터를 송수신하는 코드에서 QueuePipe를 통해 보내는 데이터가 예상대로 전송되고 있는지 확인하세요. 전송 전 데이터의 일관성을 확인하는 것이 중요합니다.


참고문서

이 문서들이 multiprocessing 모듈을 깊이 이해하고 효과적으로 활용하는 데 도움이 되기를 바랍니다.

728x90
반응형