Python의 비동기 백그라운드 작업을 통한 웹 앱 강화
Grace Collins
Solutions Engineer · Leapcell

소개
빠르게 변화하는 웹 개발 세계에서 사용자 경험은 종종 애플리케이션의 응답성에 달려 있습니다. 사용자가 대용량 파일을 업로드하거나, 복잡한 보고서를 생성하거나, 까다로운 데이터 분석을 처리하는 상황을 상상해 보세요. 이러한 작업이 메인 웹 스레드를 차단하면 애플리케이션이 멈추어 사용자에게 좌절감을 주고 잠재적으로 이탈하게 만들 수 있습니다. 비동기 백그라운드 작업은 이러한 딜레마에서 우리를 구하는 숨은 영웅입니다. 시간이 많이 소요되는 작업을 별도의 프로세스로 오프로드함으로써 웹 애플리케이션은 빠르고 상호작용 가능하게 유지되어 사용자에게 원활한 경험을 제공합니다. 이 게시물에서는 두 가지 인기 있는 라이브러리인 Dramatiq와 Arq를 사용하여 Python 웹 애플리케이션에서 간단하면서도 매우 효율적인 백그라운드 작업 처리를 구현하는 방법을 자세히 살펴보겠습니다.
핵심 개념 이해
Dramatiq와 Arq를 사용하여 접하게 될 비동기 작업 처리와 관련된 핵심 개념에 대한 공통된 이해를 먼저 정립해 보겠습니다.
- 작업 큐: 핵심적으로 작업 큐는 프로듀서(우리 웹 애플리케이션)가 작업을 enq(대기열에 넣음)하고 컨슈머(작업자 프로세스)가 비동기적으로 작업을 deq(대기열에서 꺼내)하여 실행할 수 있도록 하는 시스템입니다. 이는 작업 제출과 작업 실행을 분리합니다.
- 브로커: 브로커는 프로듀서와 컨슈머 간의 중개자입니다. 작업자(worker)가 처리할 수 있을 때까지 큐에 작업을 저장합니다. 인기 있는 브로커에는 Redis 및 RabbitMQ가 있습니다.
- 작업자(Worker): 작업자는 작업 큐를 지속적으로 폴링하고, 작업을 검색하고, 관련 코드를 실행하는 별도의 프로세스 또는 스레드입니다.
- 프로듀서: 우리 맥락에서 웹 애플리케이션은 작업을 생성하고 작업 큐로 보내는 프로듀서 역할을 합니다.
- 컨슈머: 작업자 프로세스는 큐에서 작업을 가져와 실제 작업을 수행하는 컨슈머입니다.
- 직렬화(Serialization): 작업이 큐로 전송될 때, 저장되고 나중에 재구성할 수 있는 형식으로 변환되어야 합니다. 이 프로세스를 직렬화(예: JSON 또는 MessagePack 사용)라고 합니다.
원리는 간단합니다. 웹 애플리케이션은 브로커를 통해 작업 큐로 작업을 보냅니다. 이 큐를 감시하는 별도의 작업자 프로세스가 작업을 가져와 백그라운드에서 실행하여 거의 즉시 메인 웹 요청 스레드를 해제합니다.
Dramatiq를 사용한 백그라운드 작업 구현
Dramatiq는 Python 3용으로 빠르고 기능이 완전한 작업 큐입니다. 현대 Python을 염두에 두고 설계되었으며 친근한 API, 강력한 타이핑 및 우수한 성능을 제공합니다. Redis 및 RabbitMQ와 같은 다양한 브로커를 지원합니다.
간단한 예제를 통해 설명하겠습니다. 사용자가 가입한 후 이메일을 보내는 것은 일반적인 백그라운드 작업입니다.
설정:
먼저 Dramatiq와 해당 Redis 브로커를 설치합니다.
pip install dramatiq dramatiq-redis
코드 예제:
백그라운드 작업을 위한 tasks.py
와 이러한 작업을 enq하는 Flask 웹 애플리케이션을 위한 app.py
두 개의 파일을 만들 것입니다.
tasks.py
:
import dramatiq from dramatiq.brokers.redis import RedisBroker import time import logging # 작업자를 위한 로깅 구성 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Redis 브로커 설정 redis_broker = RedisBroker(host="localhost", port=6379, db=0) dramatiq.set_broker(redis_broker) @dramatiq.actor(max_retries=3, min_backoff=1000) # 최대 3회 재시도, 최소 1초 대기 def send_welcome_email(user_email: str): """ 새 사용자에게 환영 이메일을 보내는 것을 시뮬레이션합니다. 이 작업은 시간이 걸리거나 실패할 수 있습니다. """ logging.info(f"{user_email}에게 환영 이메일을 보내려고 시도 중...") try: # 네트워크 호출 또는 무거운 처리 시뮬레이션 time.sleep(5) if user_email == "error@example.com": raise ValueError("error 이메일에 대한 시뮬레이션된 네트워크 문제") logging.info(f"{user_email}에게 환영 이메일을 성공적으로 보냈습니다.") return True except Exception as e: logging.error(f"{user_email}에게 이메일을 보내는 데 실패했습니다: {e}") # max_retries가 설정된 경우 Dramatiq가 자동으로 재시도합니다. raise # Dramatiq의 재시도 메커니즘을 트리거하기 위해 다시 발생시킴 @dramatiq.actor def generate_report(report_id: str, data: dict): """ 복잡한 보고서 생성을 시뮬레이션합니다. """ logging.info(f"데이터 {data}로 보고서 {report_id}를 생성 중입니다.") time.sleep(10) logging.info(f"보고서 {report_id} 생성 완료.") return f"보고서 {report_id}가 성공적으로 생성되었습니다." # 여기에 더 많은 작업을 추가할 수 있습니다.
app.py
(Flask 예제):
from flask import Flask, request, jsonify from tasks import send_welcome_email, generate_report import dramatiq import logging app = Flask(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @app.route('/signup', methods=['POST']) def signup(): user_email = request.json.get('email') if not user_email: return jsonify({"message": "이메일이 필요합니다"}), 400 # 작업을 enq합니다. 완료될 때까지 기다리지 않습니다. send_welcome_email.send(user_email) logging.info(f"{user_email}에 대한 가입 요청이 처리되었습니다. 이메일 작업이 enq되었습니다.") return jsonify({"message": "사용자 가입이 성공적으로 완료되었습니다. 곧 환영 이메일이 전송됩니다."}), 202 @app.route('/create_report', methods=['POST']) def create_report(): report_data = request.json.get('data') report_id = request.json.get('id') if not report_data or not report_id: return jsonify({"message": "보고서 ID와 데이터가 필요합니다"}), 400 generate_report.send(report_id, report_data) logging.info(f"보고서 생성 요청이 접수되었습니다. 보고서 {report_id}에 대한 작업이 enq되었습니다.") return jsonify({"message": f"보고서 {report_id} 생성이 백그라운드에서 시작되었습니다."}), 202 if __name__ == '__main__': # app이 실행되기 전에 dramatiq 브로커가 설정되었는지 확인합니다. # 실제 앱에서는 애플리케이션 팩토리 또는 구성을 통해 처리될 수 있습니다. from tasks import redis_broker # 브로커가 구성되었는지 확인하기 위해 임포트 logging.info("Flask 앱 시작 중...") app.run(debug=True, port=5000)
시스템 실행:
- Redis 시작: Redis 서버가 실행 중인지 확인합니다. 일반적으로
localhost:6379
입니다. - 작업자 시작: 터미널을 열고 Dramatiq 작업자를 실행합니다.
undefined
dramatiq tasks
이 명령은 Dramatiq에 `tasks.py`에 정의된 작업에 대한 작업자를 찾아 실행하도록 지시합니다.
3. **웹 애플리케이션 시작:** 다른 터미널을 열고 Flask 애플리케이션을 실행합니다.
```bash
python app.py
테스트:
JSON 본문 (예: {"email": "test@example.com"}
)과 함께 /signup
에 POST 요청을 보냅니다. Flask 앱은 즉시 응답하고(상태 코드 202), 작업자 터미널에서 이메일 보내기 시뮬레이션이 시작되는 것을 볼 수 있습니다. 재시도를 보려면 {"email": "error@example.com"}
으로 반복합니다.
{"id": "monthly-sales", "data": {"month": "jan", "year": 2023}}
과 함께 /create_report
에 POST 요청을 보냅니다.
Dramatiq는 @dramatiq.actor
와 같은 데코레이터를 사용하여 함수를 작업으로 등록합니다. send()
메서드는 작업을 enq합니다. 또한 재시도, 지연 및 복잡한 워크플로우 정의를 위한 간단한 API를 제공합니다.
Arq를 사용한 백그라운드 작업 구현
Arq는 Python 3용으로 또 다른 최신 고성능 작업 큐입니다. asyncio를 기반으로 구축되어 비동기 Python 애플리케이션에 훌륭한 선택입니다. Arq 또한 주로 Redis를 브로커로 사용합니다.
설정:
Arq 설치:
pip install arq
코드 예제:
Dramatiq와 유사하게 Arq 구성 및 작업을 위한 worker_settings.py
와 작업을 enq하는 FastAPI(최신 비동기 웹 프레임워크)를 위한 app.py
를 가집니다.
worker_settings.py
:
from arq import ArqRedis, create_pool import asyncio import logging import time logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') async def startup(ctx): """ 작업자가 시작될 때 한 번 호출됩니다. """ ctx['redis'] = await create_pool('redis://localhost:6379/1') # 명확성을 위해 Dramatiq와 다른 DB 사용 logging.info("Arq 작업자 시작 중...") async def shutdown(ctx): """ 작업자가 종료될 때 한 번 호출됩니다. """ await ctx['redis'].close() logging.info("Arq 작업자 종료 중...") async def send_welcome_email_arq(ctx, user_email: str): """ Arq를 사용하여 환영 이메일 보내기를 시뮬레이션합니다. """ logging.info(f"[Arq] {user_email}에게 환영 이메일을 보내려고 시도 중...") try: await asyncio.sleep(5) # 비동기 함수에서는 await asyncio.sleep 사용 if user_email == "error_arq@example.com": raise ValueError("error 이메일에 대한 시뮬레이션된 Arq 네트워크 문제") logging.info(f"[Arq] {user_email}에게 환영 이메일을 성공적으로 보냈습니다.") return True except Exception as e: logging.error(f"[Arq] {user_email}에게 이메일을 보내는 데 실패했습니다: {e}") raise # Arq의 재시도 메커니즘을 트리거하기 위해 다시 발생시킴 async def process_image_arq(ctx, image_url: str, user_id: int): """ 이미지 처리를 시뮬레이션합니다. """ logging.info(f"[Arq] 사용자 {user_id}를 위한 이미지 {image_url} 처리 중...") await asyncio.sleep(8) logging.info(f"[Arq] 사용자 {user_id}를 위한 이미지 {image_url} 처리 완료.") return {"status": "processed", "user_id": user_id, "image_url": image_url} # 작업자 설정 사전 class WorkerSettings: """ Arq 작업자 설정. """ functions = [send_welcome_email_arq, process_image_arq] on_startup = startup on_shutdown = shutdown # 여기에 재시도 정책 정의, 예: send_welcome_email_arq의 경우 # job_timeout = 60 # 초 # self_retry_delay = 5 # 첫 번째 재시도 전 대기 초
app.py
(FastAPI 예제):
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from arq import ArqRedis, create_pool from arq.connections import RedisSettings import asyncio import logging app = FastAPI() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 요청 본문에 대한 Pydantic 모델 정의 class UserSignup(BaseModel): email: str class ImageProcess(BaseModel): image_url: str user_id: int # 전역 ArqRedis 연결 풀 arq_redis: ArqRedis = None @app.on_event("startup") async def startup_event(): global arq_redis arq_redis = await create_pool(RedisSettings(host='localhost', port=6379, database=1)) logging.info("FastAPI: Arq Redis 풀 생성됨.") @app.on_event("shutdown") async def shutdown_event(): global arq_redis if arq_redis: await arq_redis.close() logging.info("FastAPI: Arq Redis 풀 종료됨.") @app.post('/signup_arq', status_code=202) async def signup_arq(user: UserSignup): if not arq_redis: logging.error("Arq Redis 풀이 초기화되지 않았습니다.") return {"message": "내부 서버 오류: 작업 큐 준비 안 됨"}, 500 # 작업 enq await arq_redis.enqueue_job('send_welcome_email_arq', user.email) logging.info(f"[FastAPI] {user.email}에 대한 가입 요청이 처리되었습니다. Arq 이메일 작업이 enq되었습니다.") return {"message": "사용자 가입이 성공적으로 완료되었습니다. 곧 환영 이메일이 전송됩니다. (Arq)"} @app.post('/process_image_arq', status_code=202) async def process_image_endpoint(image_data: ImageProcess): if not arq_redis: logging.error("Arq Redis 풀이 초기화되지 않았습니다.") return {"message": "내부 서버 오류: 작업 큐 준비 안 됨"}, 500 await arq_redis.enqueue_job('process_image_arq', image_data.image_url, image_data.user_id) logging.info(f"[FastAPI] {image_data.image_url}에 대한 이미지 처리 요청이 접수되었습니다. Arq 작업이 enq되었습니다.") return {"message": f"이미지 {image_data.image_url} 처리가 백그라운드에서 시작되었습니다. (Arq)"} if __name__ == '__main__': import uvicorn logging.info("FastAPI 앱 시작 중...") uvicorn.run(app, host="0.0.0.0", port=8000)
시스템 실행:
- Redis 시작: Redis 서버가 실행 중인지 확인합니다.
- 작업자 시작: 터미널을 열고 Arq 작업자를 실행합니다.
arq worker worker_settings.WorkerSettings
- 웹 애플리케이션 시작: 다른 터미널을 열고 FastAPI 애플리케이션을 실행합니다.
uvicorn app:app --host 0.0.0.0 --port 8000 --reload
테스트:
{"email": "test_arq@example.com"}
과 함께 /signup_arq
에 POST 요청을 보냅니다. Dramatiq와 유사한 동작을 관찰하게 되며, 즉각적인 웹 응답과 Arq 작업자에 의한 백그라운드 처리가 이루어집니다. 오류 처리를 보려면 {"email": "error_arq@example.com"}
을 사용해 봅니다.
{"image_url": "https://example.com/image.jpg", "user_id": 123}
과 함께 /process_image_arq
에 POST 요청을 보냅니다.
Arq의 접근 방식은 작업으로 노출될 함수 목록을 포함하는 WorkerSettings
클래스 또는 모듈을 정의하는 것을 포함합니다. await arq_redis.enqueue_job()
을 사용하여 작업을 enq합니다. asyncio를 기반으로 구축되었으므로 Arq는 FastAPI 및 Starlette와 같은 비동기 웹 프레임워크와 자연스럽게 통합되어 경험이 매우 원활합니다. 또한 재시도, 예약된 작업 및 작업 결과를 지원합니다.
애플리케이션 시나리오
Dramatiq와 Arq는 작업 오프로드가 중요한 다양한 시나리오에서 모두 뛰어납니다:
- 이메일 보내기: 환영 이메일, 비밀번호 재설정, 알림 이메일.
- 이미지/동영상 처리: 크기 조정, 워터마킹, 형식 변환.
- 보고서 생성: 복잡한 데이터 내보내기, PDF 생성.
- 타사 API 호출: 느리거나 신뢰할 수 없는 통합 (예: 결제 처리, SMS 게이트웨이).
- 데이터 가져오기/내보내기: 대용량 CSV 파일 처리, 외부 시스템과의 데이터 동기화.
- 검색 색인: 데이터 변경 후 검색 색인 업데이트.
- 비동기 알림: 푸시 알림 또는 웹훅 보내기.
Dramatiq와 Arq 중 선택하기
둘 다 훌륭한 선택입니다. 다음은 간략한 안내입니다.
- Dramatiq: 프로젝트가 주로 동기식이거나 모든 작업 함수에서
async/await
를 사용하지 않는 보다 전통적인 작업 큐 API를 선호한다면 Dramatiq는 강력한 경쟁자입니다. 강력하고 실제 경험을 통해 검증되었습니다. - Arq: 비동기 Python(FastAPI, Starlette 등)을 기반으로 프로젝트를 구축하고 더 나은 성능과 통합을 위해 백그라운드 작업도 자연스럽게
async/await
를 사용하기를 원한다면 Arq는 이상적인 선택입니다. 이미 비동기 에코시스템에 있다면 기본 asyncio 지원을 통해 코드베이스를 단순화할 수 있습니다.
둘 다 재시도, 지연 및 강력한 오류 처리를 포함한 유사한 핵심 기능을 제공합니다. 선택은 종종 프로젝트의 기존 비동기 패러다임과 개인적인 선호도에 따라 달라집니다.
결론
백그라운드 작업을 구현하는 것은 응답성과 확장성이 뛰어난 Python 웹 애플리케이션을 구축하기 위한 기본적인 기술입니다. Dramatiq 또는 Arq와 같은 라이브러리를 원활하게 통합함으로써 개발자는 연산 집약적이거나 시간이 많이 소요되는 작업을 전용 작업자 프로세스에 위임할 수 있습니다. 이는 UI가 멈추는 것을 방지하여 사용자 경험을 향상할 뿐만 아니라 애플리케이션의 전반적인 복원력과 효율성을 향상시킵니다. 비동기 작업 처리를 채택하는 것은 보다 강력하고 만족스러운 웹 서비스를 구축하는 직접적인 경로입니다.