Great Expectations(GX)는 데이터 품질을 코드로 정의하고 검증하는 파이썬 라이브러리다. 파이프라인에서 데이터 오류를 조기에 발견하고 문서를 자동 생성한다.
Great Expectations 핵심 개념
| 개념 | 설명 |
|---|
| Expectation | 데이터에 대한 단언 (예: 컬럼이 not null이어야 한다) |
| Expectation Suite | 여러 Expectation의 집합 |
| Data Context | GX 프로젝트 설정 및 자산 관리 |
| Checkpoint | Suite + 데이터소스 실행 단위 |
| Data Docs | 자동 생성된 HTML 문서 |
Expectation Suite 생성
python
import great_expectations as gx
context = gx.get_context()
# 데이터소스 연결
datasource = context.sources.add_pandas_filesystem(
name="my_datasource",
base_directory="./data/",
)
asset = datasource.add_csv_asset(name="orders", batching_regex=r"orders_(?P<date>d{8}).csv")
# Suite 생성
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
batch_request=asset.build_batch_request(),
expectation_suite_name="orders_suite",
)
# Expectations 정의
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between(
"amount", min_value=0, max_value=1_000_000
)
validator.expect_column_values_to_be_in_set(
"status", ["placed", "shipped", "completed", "cancelled"]
)
validator.expect_table_row_count_to_be_between(
min_value=1000, max_value=1_000_000
)
validator.save_expectation_suite()
Checkpoint 실행
python
checkpoint = context.add_or_update_checkpoint(
name="orders_checkpoint",
validator=validator,
)
result = checkpoint.run()
if not result.success:
# Slack 알림 또는 파이프라인 중단
raise ValueError("데이터 품질 검증 실패!")
# Data Docs 업데이트
context.build_data_docs()
Airflow/dbt 통합
python
from great_expectations.core.batch import BatchRequest
# Airflow 태스크로 실행
gx_task = GreatExpectationsOperator(
task_id="validate_orders",
datasource_name="my_datasource",
data_connector_name="default_inferred_data_connector",
data_asset_name="orders",
expectation_suite_name="orders_suite",
return_json_dict=True,
)