Python 제너레이터와 코루틴을 활용한 고급 기법
Wenhao Wang
Dev Intern · Leapcell

비동기 Python 프로그래밍 소개
현대 소프트웨어 개발 환경에서 효율성과 응답성은 매우 중요합니다. 작업이 순차적으로 실행되는 전통적인 동기 프로그래밍은 특히 네트워크 요청이나 파일 접근과 같은 I/O 바운드 작업에서 병목 현상을 일으킬 수 있습니다. 바로 여기서 비동기 프로그래밍이 빛을 발하며, 프로그램이 메인 실행 스레드를 차단하지 않고 여러 작업을 동시에 수행할 수 있도록 합니다. Python은 효율적이고 확장 가능한 비동기 애플리케이션 구축의 기본이 되는 강력한 제너레이터 및 코루틴과 같은 구문을 제공합니다. 고급 사용법을 이해하면 복잡한 작업을 처리하고, 정교한 데이터 처리 파이프라인을 구축하며, 애플리케이션 성능을 크게 향상시킬 수 있는 새로운 가능성을 열어줍니다. 이 글은 Python 제너레이터와 코루틴의 고급 기법을 자세히 살펴보고, 이를 활용하여 더 우아하고 동시적이며 고성능의 코드를 작성하는 방법을 보여줍니다.
동시 실행의 핵심 개념
고급 활용 사례를 자세히 살펴보기 전에, 논의의 기반이 되는 핵심 개념을 간략히 복습해 보겠습니다.
- 제너레이터(Generator): 이터레이터(iterator) 객체를 반환하는 특수한 유형의 함수입니다.
yield키워드를 사용하여 실행을 일시 중지하고 값을 내보내며,next()를 호출하면 중단된 지점에서 다시 시작합니다. 제너레이터는 전체 목록을 메모리에 구축하는 대신 요청 시 값을 생성하므로 메모리 효율적입니다. - 코루틴(Coroutine): 서브루틴의 일반화입니다. 서브루틴과 달리 코루틴은 실행을 일시 중지했다가 나중에 중단된 지점에서 다시 시작할 수 있습니다. Python에서는 제너레이터를 코루틴으로 사용할 수 있으며, 특히
yield from구문을 사용하면 하위 제너레이터에 위임할 수 있습니다. Python의async/await키워드는asyncio프레임워크 내에서 코루틴을 정의하고 작업하는 데 더 명시적이고 전용적인 구문을 제공합니다. - 이벤트 루프(Event Loop): 비동기 시스템의 핵심입니다. 다양한 작업을 모니터링하고 준비가 되었을 때 실행되도록 예약하여 코루틴의 실행 흐름을 효과적으로 관리합니다.
- 비동기 I/O(Async I/O): 프로그램이 I/O 작업이 완료되기를 기다리는 동안 다른 작업을 계속할 수 있도록 허용하는 입력/출력 처리의 한 형태입니다. 이는 논블로킹 작업에 중요합니다.
고급 제너레이터 패턴
제너레이터는 단순한 반복 작업용이 아니라 강력한 데이터 처리 파이프라인을 구축하는 데 사용될 수 있습니다.
제너레이터를 이용한 데이터 파이프라이닝
대규모 로그 파일을 처리해야 하는 시나리오를 생각해 봅시다. 줄을 필터링하고, 특정 정보를 추출하고, 그런 다음 형식을 지정해야 합니다. 연결된 제너레이터 표현식이나 함수를 사용하면 이를 효율적으로 수행할 수 있습니다.
import re def read_log_file(filepath): """제너레이터를 사용하여 로그 파일에서 줄을 읽습니다.""" with open(filepath, 'r') as f: for line in f: yield line.strip() def filter_errors(lines): """'ERROR'를 포함하는 줄을 필터링합니다.""" for line in lines: if "ERROR" in line: yield line def extract_timestamps(error_lines): """오류 줄에서 타임스탬프를 추출합니다.""" timestamp_pattern = re.compile(r"[\[](\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})[ \]]") for line in error_lines: match = timestamp_pattern.search(line) if match: yield match.group(1) # 예시 사용법 # 데모를 위한 더미 로그 파일 생성 with open('sample.log', 'w') as f: f.write("[2023-10-26 10:00:01] INFO User logged in\n") f.write("[2023-10-26 10:00:05] ERROR Failed to connect to DB\n") f.write("[2023-10-26 10:00:10] DEBUG Processing request\n") f.write("[2023-10-26 10:00:15] ERROR Invalid input data\n") log_lines = read_log_file('sample.log') filtered_errors = filter_errors(log_lines) error_timestamps = extract_timestamps(filtered_errors) print("Error Timestamps:") for ts in error_timestamps: print(ts)
이 예제에서 각 함수는 이전 단계에서 데이터를 소비하고 다음 단계에 대해 변환된 데이터를 생산하는 제너레이터입니다. 이렇게 하면 데이터가 한 번에 한 항목씩 지연 처리되므로 메모리 효율적인 파이프라인이 생성됩니다. 중간 목록은 생성되지 않으며, 이는 대규모 데이터셋에 중요합니다.
유한 상태 기계로서의 제너레이터
제너레이터는 send()를 통해 값을 생성하고 입력을 받아 간단한 유한 상태 기계 역할을 할 수 있습니다. 이렇게 하면 단일 제너레이터 함수가 외부 이벤트에 따라 내부 상태를 관리할 수 있습니다.
특정 토큰에 따라 모드를 전환하는 간단한 파서를 고려해 봅시다.
def state_machine_parser(): state = "INITIAL" while True: token = yield state # 현재 상태를 생성하고 다음 토큰을 받습니다. if state == "INITIAL": if token == "START_BLOCK": state = "IN_BLOCK" elif token == "END_STREAM": print("Stream ended during INITIAL state.") return else: print(f"Ignoring token '{token}' in INITIAL state.") elif state == "IN_BLOCK": if token == "PROCESS_ITEM": print("Processing item inside block.") elif token == "END_BLOCK": state = "INITIAL" elif token == "END_STREAM": print("Stream ended during IN_BLOCK state.") return else: print(f"Handling token '{token}' inside block.") # 상태 기계 초기화 parser = state_machine_parser() next(parser) # 제너레이터를 시작하고 "INITIAL"을 생성합니다. print(parser.send("SOME_DATA")) # 출력: Ignoring token 'SOME_DATA' in INITIAL state. print(parser.send("START_BLOCK")) # 출력: IN_BLOCK print(parser.send("PROCESS_ITEM")) # 출력: Processing item inside block. print(parser.send("ANOTHER_ITEM")) # 출력: Handling token 'ANOTHER_ITEM' inside block. print(parser.send("END_BLOCK")) # 출력: INITIAL print(parser.send("END_STREAM")) # 출력: Stream ended during INITIAL state.
state_machine_parser 제너레이터는 현재 상태를 생성하고 자신에게 전송된 토큰을 소비합니다. 토큰과 현재 상태에 따라 새 상태로 전환하거나 작업을 수행합니다. 이 패턴은 이벤트 기반 시스템이나 프로토콜 파싱에 효과적입니다.
Asyncio를 사용한 코루틴
asyncio 라이브러리는 async/await 구문과 함께 Python의 주요 비동기 프로그래밍 프레임워크를 제공합니다. yield 제너레이터를 코루틴으로 사용할 수 있지만, async def 코루틴은 asyncio 이벤트 루프와 더 명시적으로 통합됩니다.
비동기 태스크 구축
코루틴은 이벤트 루프에 의해 실행됩니다. await는 하나의 코루틴, Future 또는 Task가 완료될 때까지 코루틴의 실행을 일시 중지하는 데 사용됩니다.
import asyncio import time async def fetch_data(delay, item_id): """비동기 네트워크 요청을 시뮬레이션합니다.""" print(f"[{time.time():.2f}] Start fetching data for item {item_id}") await asyncio.sleep(delay) # I/O 바운드 작업을 시뮬레이션합니다. print(f"[{time.time():.2f}] Finished fetching data for item {item_id}") return f"Data for {item_id} after {delay} seconds" async def main(): start_time = time.time() # 동시에 실행되는 여러 태스크 생성 task1 = asyncio.create_task(fetch_data(3, "A")) task2 = asyncio.create_task(fetch_data(1, "B")) task3 = asyncio.create_task(fetch_data(2, "C")) # 모든 태스크가 완료될 때까지 대기 results = await asyncio.gather(task1, task2, task3) print("\nAll tasks completed.") for res in results: print(res) end_time = time.time() print(f"Total execution time: {end_time - start_time:.2f} seconds") # 메인 코루틴 실행 if __name__ == "__main__": asyncio.run(main())
이 예제에서 fetch_data는 데이터를 가져오는 것을 시뮬레이션하는 async 코루틴입니다. main은 세 개의 그러한 태스크를 생성하고 asyncio.gather를 사용하여 동시에 실행합니다. 태스크 A, B, C의 지연 시간이 각각 3, 1, 2초임에도 불구하고, 총 실행 시간은 합계(6초)가 아닌 최대 지연 시간(3초)에 가깝습니다. 이는 진정한 동시성을 보여줍니다.
yield from (async/await 이전) 및 await를 사용한 고급 코루틴 위임
async/await가 현대적인 방식이지만, 제너레이터 기반 코루틴의 yield from을 이해하는 것은 Python 비동기 기능의 진화에 대한 통찰력을 제공합니다. yield from을 사용하면 제너레이터가 작업의 일부를 다른 제너레이터에 위임할 수 있습니다. async/await를 사용하면 await를 사용하여 다른 코루틴을 호출하면 이 위임이 더 명시적입니다.
async/await를 사용하여 이를 설명해 보겠습니다.
import asyncio async def sub_task(name, delay): print(f" Sub-task {name}: Starting...") await asyncio.sleep(delay) print(f" Sub-task {name}: Finished.") return f"Result from {name}" async def main_task(task_id): print(f"Main task {task_id}: Starting...") # sub_task에 실행 위임, sub_task가 완료될 때까지 main_task 일시 중지 result_a = await sub_task(f"{task_id}-A", 1) result_b = await sub_task(f"{task_id}-B", 0.5) print(f"Main task {task_id}: Received '{result_a}' and '{result_b}'.") return f"Main task {task_id} complete with {result_a}, {result_b}" async def orchestrator(): print("Orchestrator: Kicking off main tasks...") results = await asyncio.gather( main_task("X"), main_task("Y") ) print("\nOrchestrator: All main tasks finished.") for r in results: print(f"Final result: {r}") if __name__ == "__main__": asyncio.run(orchestrator())
여기서 orchestrator는 main_task("X")와 main_task("Y")를 동시에 실행합니다. 각 main_task는 차례로 sub_task에 대해 순차적으로 await합니다. 이것은 코루틴이 복잡하고 중첩된 비동기 작업을 구축하는 방법을 보여줍니다. await 키워드는 호출 코루틴에서 대기하는 코루틴으로 효과적으로 제어권을 위임하고, 완료되면 호출자를 재개합니다.
asyncio를 사용한 동시성 기본 요소
asyncio는 코루틴 실행을 관리하기 위한 스레딩 구성과 유사하지만 코루틴을 위해 설계된 여러 기본 요소를 제공합니다.
- 잠금(Lock) (
asyncio.Lock): 한 번에 하나의 코루틴만 공유 리소스에 액세스하도록 보장하여 경쟁 조건을 방지합니다. - 세마포(Semaphore) (
asyncio.Semaphore): 리소스에 동시에 액세스할 수 있는 코루틴 수를 제한합니다. 연결 풀링 또는 속도 제한에 유용합니다. - 이벤트(Event) (
asyncio.Event): 코루틴이 서로 신호를 보낼 수 있도록 합니다. 코루틴은 이벤트가 설정될 때까지 기다릴 수 있으며, 다른 코루틴은 이벤트를 설정할 수 있습니다. - 큐(Queue) (
asyncio.Queue): 코루틴 간의 통신을 위한 스레드 안전(및 코루틴 안전) 큐로, 프로듀서-소비자 패턴을 활성화합니다.
이러한 기본 요소는 공유 상태 및 리소스를 안전하게 관리하는 강력한 비동기 애플리케이션을 구축하는 데 필수적입니다.
결론
Python의 제너레이터와 코루틴, 특히 asyncio 프레임워크는 효율적이고 논블로킹이며 동시적인 코드를 작성하기 위한 강력한 도구를 제공합니다. 제너레이터를 사용한 우아한 데이터 파이프라이닝부터 async/await를 사용한 복잡한 비동기 워크플로 오케스트레이션에 이르기까지, 이러한 고급 기법을 익히는 것은 까다로운 계산 및 I/O 바운드 작업을 더 큰 효율성과 응답성으로 처리할 수 있도록 지원합니다. 이러한 기능을 활용하는 것은 현대적이고 고성능인 애플리케이션을 위해 Python의 전체 잠재력을 발휘하는 데 중요합니다.