본문 바로가기

Study Information Technology

스레드 안전한 데이터 교환을 위한 Python의 queue 모듈 구현

728x90
반응형

스레드 안전한 데이터 교환을 위한 Python의 queue 모듈 구현

Overview

Python의 queue 모듈은 스레드 간 데이터 교환을 안전하게 처리하기 위해 설계되었습니다. 이 모듈은 여러 스레드가 동시에 데이터를 읽거나 쓸 수 있도록 보장하며, 데이터 교환의 복잡성을 단순화합니다. queue 모듈의 주요 구성 요소는 Queue, LifoQueue, PriorityQueue입니다. 각각의 큐는 다른 특성과 용도로 설계되었으며, 이들 큐를 활용하여 효과적으로 스레드 간의 데이터 교환을 관리할 수 있습니다.


1. Queue 클래스

Queue 클래스는 선입선출(FIFO) 방식으로 데이터를 처리합니다. 이는 데이터가 큐에 삽입된 순서대로 처리되도록 보장합니다. 이 클래스는 스레드 간 안전하게 데이터 교환을 가능하게 하며, put()get() 메서드를 사용하여 데이터를 삽입하거나 추출합니다.

예제

import queue
import threading
import time

def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
time.sleep(1)

def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
q.task_done()

q = queue.Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

t1.join()
q.put(None)  # Signal the consumer to exit
t2.join()

위 예제에서 producer 함수는 큐에 데이터를 추가하고, consumer 함수는 큐에서 데이터를 가져옵니다. task_done() 메서드는 큐의 작업이 완료되었음을 알립니다.

2. LifoQueue 클래스

LifoQueue는 후입선출(LIFO) 방식으로 데이터를 처리합니다. 가장 최근에 삽입된 데이터가 가장 먼저 추출됩니다. 이는 스택(Stack)과 유사한 동작을 합니다.

예제

import queue
import threading
import time

def producer(q):
for i in range(5):
q.put(i)
print(f'Produced {i}')
time.sleep(1)

def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f'Consumed {item}')
q.task_done()

q = queue.LifoQueue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

t1.join()
q.put(None)  # Signal the consumer to exit
t2.join()

이 예제에서 producerconsumerLifoQueue를 사용하여 데이터를 추가하고 추출합니다. 큐는 후입선출 방식으로 작동하여 가장 최근에 삽입된 데이터가 먼저 처리됩니다.

3. PriorityQueue 클래스

PriorityQueue는 우선순위 큐로, 각 항목이 우선순위와 함께 삽입됩니다. 항목은 우선순위에 따라 추출되며, 우선순위가 낮은 항목이 먼저 처리됩니다. 이 큐는 주로 우선순위가 있는 작업을 처리하는 데 유용합니다.

예제

import queue
import threading
import time

def producer(q):
for i in range(5):
q.put((i, f'item{i}'))
print(f'Produced item{i}')
time.sleep(1)

def consumer(q):
while True:
priority, item = q.get()
if item is None:
break
print(f'Consumed {item}')
q.task_done()

q = queue.PriorityQueue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

t1.join()
q.put((float('inf'), None))  # Signal the consumer to exit
t2.join()

이 예제에서 producer는 우선순위와 함께 항목을 큐에 추가하며, consumer는 우선순위에 따라 항목을 추출합니다. float('inf')를 사용하여 종료 신호를 전달합니다.


에러와 해결책

  1. queue.Empty 오류
  • 원인: 큐에서 데이터를 가져오려고 할 때 큐가 비어있습니다.
  • 해결책: queue.get() 메서드에 timeout 매개변수를 설정하여 큐가 비어있을 때의 대기 시간을 조정하거나, queue.Empty 예외를 처리하여 대처할 수 있습니다.
try:
item = q.get(timeout=1)
except queue.Empty:
print('Queue is empty')
  1. queue.Full 오류
  • 원인: 큐에 데이터를 추가하려고 할 때 큐가 가득 차 있습니다.
  • 해결책: queue.put() 메서드에 timeout 매개변수를 설정하여 큐에 여유 공간이 생길 때까지 기다리거나, queue.Full 예외를 처리하여 대처할 수 있습니다.
try:
q.put(item, timeout=1)
except queue.Full:
print('Queue is full')

참고문서

728x90
반응형