Prefect는 Python 네이티브 데이터 파이프라인 오케스트레이션 도구다. 데코레이터 기반으로 기존 Python 코드를 파이프라인으로 전환하며, Prefect Cloud 또는 자체 서버로 관리한다.
Prefect vs Airflow 비교
| 항목 | Prefect | Airflow |
|---|
| 코드 스타일 | 데코레이터 기반, Pythonic | DAG 클래스 기반 |
| 동적 파이프라인 | 런타임에 가능 | 제한적 |
| 에러 처리 | 자동 재시도, 알림 | 수동 설정 |
| UI | Prefect Cloud/Server | 내장 |
| 배포 | 유연 (어디서나) | 복잡 |
기본 Flow 정의
python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def extract_data(source_url: str) -> list:
import httpx
response = httpx.get(source_url)
return response.json()
@task
def transform_data(raw_data: list) -> list:
return [
{
"id": item["id"],
"value": item["value"] * 1.1, # 변환 로직
}
for item in raw_data
if item.get("active", False)
]
@task
def load_data(data: list, table: str):
# DB 삽입 로직
print(f"{len(data)}개 레코드를 {table}에 로드")
@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline(source_url: str = "https://api.example.com/data"):
raw = extract_data(source_url)
transformed = transform_data(raw)
load_data(transformed, table="processed_data")
return len(transformed)
if __name__ == "__main__":
etl_pipeline()
배포 및 스케줄링
python
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=etl_pipeline,
name="daily-etl",
schedule=CronSchedule(cron="0 2 * * *", timezone="Asia/Seoul"),
work_queue_name="default",
parameters={"source_url": "https://api.example.com/prod/data"},
)
deployment.apply()
알림 설정
python
from prefect.blocks.notifications import SlackWebhook
slack = SlackWebhook(url="https://hooks.slack.com/...")
slack.save("slack-webhook")
# Flow에서 알림
@flow
def my_flow():
try:
do_work()
except Exception as e:
slack_webhook = SlackWebhook.load("slack-webhook")
slack_webhook.notify(f"Flow 실패: {e}")
raise