
백엔드
Celery AdvancedCelery 심화
Celery는 Python의 분산 태스크 큐 시스템으로, 브로커(Redis, RabbitMQ)를 통해 비동기 작업을 분산 처리한다. 태스크 라우팅, 체이닝, 코드, 그룹 등 고급 워크플로우를 지원한다.
기본 설정
python
# celery_app.py
from celery import Celery
app = Celery(
'myapp',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['tasks.email', 'tasks.report']
)
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Seoul',
enable_utc=True,
# 작업자 수
worker_concurrency=4,
# 태스크 시간 제한
task_soft_time_limit=300, # 5분 경고
task_time_limit=360, # 6분 강제 종료
)태스크 라우팅
python
# tasks/email.py
from celery_app import app
@app.task(bind=True, max_retries=3, queue='email')
def send_email(self, to: str, subject: str, body: str):
try:
email_service.send(to, subject, body)
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=60) # 1분 후 재시도
# tasks/report.py
@app.task(queue='heavy', rate_limit='10/m') # 분당 10회 제한
def generate_report(report_id: int):
# CPU 집약적 작업 (별도 큐)
return ReportGenerator.run(report_id)
# 라우팅 설정
app.conf.task_routes = {
'tasks.email.*': {'queue': 'email'},
'tasks.report.*': {'queue': 'heavy'},
'*': {'queue': 'default'}
}
# 워커 실행 (큐별)
# celery -A celery_app worker -Q email -c 8
# celery -A celery_app worker -Q heavy -c 2고급 워크플로우 (Canvas)
python
from celery import chain, group, chord, signature
# chain: 순차 실행 (이전 결과 → 다음 인자)
result = chain(
fetch_data.s(user_id=1001),
process_data.s(),
save_result.s()
)()
# group: 병렬 실행
parallel_tasks = group(
process_region.s('kr'),
process_region.s('us'),
process_region.s('eu')
)()
results = parallel_tasks.get()
# chord: 병렬 실행 후 콜백
chord_result = chord(
group(
analyze_chunk.s(chunk) for chunk in data_chunks
),
merge_results.s() # 모든 병렬 태스크 완료 후 실행
)()
# 복잡한 워크플로우 조합
pipeline = chain(
validate_input.s(data),
group(
enrich_with_geo.s(),
enrich_with_profile.s()
),
aggregate_enrichments.s(),
notify_user.s()
)