Python Thread 기본
Python 에서 스레드를 구현하기 위해서는 일반적으로 threading 라이브러리를 사용한다.
threading.Thread 함수로 스레드로 실행할 함수와 인자를 넣고, start 를 이용해서 스레드를 실행시킨다.
스레드 동작 순서
- 스레드를 호출한 함수가 끝난 이후에 스레드가 종료된다.
import threading
import logging # 외부 출력으로 print 대신 logging 사용
import time
def helper_function(name, m_log : logging.Logger):
m_log.info('function {} started'.format(name))
time.sleep(2)
m_log.info('function {} finished'.format(name))
# 원래 logging은 WARNING 레벨 까지만 출력이 되는데, 출력 레벨을 바꿔줄 수 있음
logging.basicConfig(level=logging.INFO)
my_logger = logging.getLogger("my")
my_logger.info('Main : Started')
x = threading.Thread(target=helper_function, args=(1, my_logger))
my_logger.info('Main : Call Thread {}'.format(1))
x.start()
my_logger.info('Main : Finished ')INFO:my:Main : Started
INFO:my:Main : Call Thread 1
INFO:my:function 1 started
INFO:my:Main : FinishedB) Daemon Thread
백그라운드에서 동작하는 프로세스를 daemon 이라고 부른다.
- daemon 을 호출한 프로그램이 종료되면 그 즉시 shut down 된다.
Main 이 종료된 이후, Thread 도 바로 종료되어서 INFO:my:function 1 finished 를 출력하지 못한 모습을 확인할 수 있다.
x = threading.Thread(target=helper_function, args=(1, my_logger),daemon=True)
my_logger.info('Main : Call Thread {}'.format(1))
x.start()
my_logger.info('Main : Finished ')INFO:my:Main : Call Thread 1
INFO:my:function 1 started
INFO:my:Main : FinishedC) join()
join() 은 특정 thread 에게 다른 thread 가 종료될 때 까지 기다리라고 말하는 것을 의미한다.
x.start() 후, x.join() 을 호출하면 main 스레드는 x 스레드가 끝날 때까지 기다린다.
my_logger.info('Main : Started')
x = threading.Thread(target=helper_function, args=(1, my_logger))
my_logger.info('Main : Call Thread {}'.format(1))
x.start()
my_logger.info('Main : Wait for the thread to finish')
x.join() # x 가 끝날때 까지 Main 스레드에게 기다리라고 하기
my_logger.info('Main : Finished ')INFO:my:Main : Started
INFO:my:Main : Call Thread 1
INFO:my:function 1 started
INFO:my:Main : Wait for the thread to finish
INFO:my:function 1 finished
INFO:my:Main : FinishedD) 다중 스레드 실행
D.1) For Loop 방식
단순히 for loop 만 사용하는 방법. 하지만 이런 방식에는 다음과 같은 문제점이 있다.
- 실행되는 많은 스레드들은 OS 가 정한 순서에 의해 적절히 동작하므로, 각 스레드가 언제 종료될지 예측하기 힘든 문제가 발생한다.
threads = list()
for i in range(3):
my_logger.info('Main : create and start thread {}'.format(i + 1))
x = threading.Thread(target=helper_function, args=(i + 1, my_logger))
threads.append(x) # threads를 list에 담고 시작
x.start()
for idx, thread in enumerate(threads):
logging.info("Main : Before joining Thread {}".format(idx + 1))
thread.join()
logging.info("Main : Thread {} done".format(idx + 1))D.2) ThreadPoolExecutor
ThreadPoolExecutor 는 실행할 스레드마다 context manager 역할을 하는 executor 를 만들어 내는 역할을 한다.
executor 는 마지막에 join() 을 이용해서 다른 thread 간 실행 간격을 적절히 조절한다.
일반적으로 다중 스레드를 다룰 때 추천되는 방법이다.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for result in executor.map(helper_function, list(range(3)), [my_logger for _ in range(3)]):
if result is not None: # 스레드 실행에 문제가 없다면 None 출력
print(result) # result 를 출력함으로써 executor의 map 실행 결과를 알 수 있다.
# 이렇게 직접 출력하지 않으면, 에러가 발생해도 알려주지 않는다.E) Race Condition
스레드를 다루다보면, Race Condition 이 발생할 수 있다. race condition 은 두개 이상의 스레드가 공유된 자원에 동시에 접근하려다 발생하는 문제다.
class FakeDatabase:
def __init__(self):
self.value = 0
def update(self, name):
"""
객체의 value 의 값을 받아서 1 상승 시킨 후, 0.1초 후에 수정된 값을 입력한다.
"""
logging.info("Thread %s: starting update", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", name)Race condition 발생 예시:
database = FakeDatabase()
my_logger.info("Testing update. Starting value is {}".format(database.value))
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for i in range(2):
executor.submit(database.update, i)
my_logger.info("Testing update. Ending value is {}".format(database.value))INFO:my:Testing update. Starting value is 0
INFO:root:Thread 0: starting update
INFO:root:Thread 1: starting update
INFO:root:Thread 0: finishing update
INFO:root:Thread 1: finishing update
INFO:my:Testing update. Ending value is 1원래라면 값이 2 가 되어야 한다 (thread 0 이 +1, thread 1 이 +1 해서 총 0+1+1=2).
왜 이런 현상이 발생하는가?
time.sleep에 의해 첫번째로 실행된 thread 는 멈추게되고, 두번째 thread 를 실행시키게 허락한다.- 이후 첫번째 스레드가 업데이트된 값을 다시 넣기 전에, 두번째 스레드가 업데이트되기 전의 값을 가져간다.
- 즉, 동일한 값 0 을 각 스레드에서 가져가고 그 값을 서로 업데이트 한다 (0 -> 1).
F) Lock 을 이용한 동기화
mutex 개념을 가진 lock 을 활용하면, race condition 을 해결할 수 있다.
- mutex: 공유된 자원을 오직 하나의 스레드만 접근 가능하도록 제한
- semaphore: 공유된 자원을 정의된 개수만큼의 스레드만 접근 가능하도록 제한 (변수 개념)
스레드는 threading.Lock() 의 acquire 함수를 실행시켜서 lock 을 획득하고, 작업을 마친 후 release 를 통해 lock 을 해제한다.
- 물론
lock이 해제되지 않는다면, 기다리는 스레드는 무한히 기다려야 할것이다.
class FakeDatabaseREV1:
def __init__(self):
self.value = 0
self._lock = threading.Lock() # mutex
def update(self, name):
logging.info("Thread %s: starting update", name)
logging.debug("Thread %s: about to lock", name)
with self._lock: # acquire 과 release를 with statement로 해결함
logging.debug("Thread %s has lock", name)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.debug("Thread %s about to release lock", name)
logging.debug("Thread %s after release", name)
logging.info("Thread %s: finishing update", name)with statement 참고:
# 아래 두 코드는 동일하다
some_lock.acquire()
try:
# do something...
finally:
some_lock.release()
with some_lock:
# do something...이 방법의 가장 큰 문제는 deadlock 이 발생할 수 있다는 점이다.
deadlock 은 종종 개발자가 .release() 를 호출하는 것을 까먹어서 생기는 경우가 많다.
일반적으로 context manager (executor) 를 사용하면 이러한 문제를 자동으로 처리해준다.
G) threading.Semaphore
세마포어는 mutex 와 비슷하게 동작한다. 즉, 스레드는 lock 을 걸고, lock 이 해제되지 않는 한 다른 스레드는 공통된 자원에 접근하지 못한다.
하지만, 세마포어는 공용 자원에 접근할 수 있는 스레드의 개수를 한 개 이상으로 설정할수 있다.
세마포어는 변수의 개념을 가진다. 만약 세마포어가 n 이고, 누군가 세마포어 (lock) 를 획득 (acquire) 한다면, 세마포어 크기는 n-1 이 된다. 이 값이 0 이 될때까지 세마포어는 공용 자원에 스레드들이 접근하는 것을 허락한다.
또한, 작업을 마친 스레드가 세마포어를 포기 (release) 한다면, 세마포어 값이 1 증가한다.
from threading import Semaphore
class FooSemaphore:
def __init__(self):
self.gates = Semaphore(0)
def first(self):
logging.info('first')
self.gates.release()
def second(self):
with self.gates:
logging.info('second')
foo = FooSemaphore()
y = threading.Thread(target=foo.second)
y.start()
x = threading.Thread(target=foo.first)
x.start()비록 두번째 thread 가 먼저 실행되었다 할지라도, 세마포어에 의해 첫번째 스레드가 실행되기를 기다린다.
이후, 첫번째 스레드가 release 를 통해 세마포어를 증가 (0->1) 시키면, 두번째 스레드가 동작 (acquire) 한다.
H) threading.Barrier
Barrier 는 고정된 스레드가 동시에 실행될 수 있도록 도와주는 객체다.
threading.Barrier(parties, action=None, timeout=None) 를 통해 초기화되고, 모든 스레드는 wait() 함수를 통해 parties + 1 만큼의 스레드가 모이길 기다린다.
from threading import Barrier
class Foo:
def __init__(self):
self.first_barrier = Barrier(1, timeout=5)
def first(self, printFirst):
printFirst()
self.first_barrier.wait() # 먼저 barrier 앞에서 기다림
def second(self, printSecond):
self.first_barrier.wait() # 그다음 스레드도 barrier 에서 기다림 (2개가 모였으므로 barrier 해제)
printSecond()
foo = Foo()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
for idx, func in enumerate([foo.first, foo.second]):
executor.submit(func, print(idx))첫번째 스레드는 자신의 인자로 주어진 함수를 실행시킨 후, 다른 스레드도 barrier 앞에서 기다리길 원한다. 두번째 스레드가 최종적으로 wait 을 실행하면, 그제서야 barrier 는 해제되며, 두번째 스레드는 printSecond() 를 실행한다.