Databricks는 Apache Spark 기반의 통합 분석 플랫폼으로, 데이터 엔지니어링, 데이터 과학, ML을 하나의 플랫폼에서 처리한다. Delta Lake를 기본으로 사용한다.
Databricks vs EMR vs Synapse
| 항목 | Databricks | EMR | Synapse Analytics |
|---|
| 기반 | Spark | Spark/Hive 등 | Spark + SQL DW |
| Delta Lake | 네이티브 | 별도 설치 | 부분 지원 |
| 협업 | 노트북 공유 | 없음 | 제한적 |
| 클라우드 | 멀티 클라우드 | AWS | Azure |
| MLflow | 통합 | 없음 | 제한적 |
Delta Lake 테이블 생성
python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .getOrCreate()
# Delta 테이블 생성
df = spark.read.format("parquet").load("s3://raw-data/orders/")
df.write.format("delta") .mode("overwrite") .partitionBy("year", "month") .save("s3://delta-lake/orders/")
Delta Lake ACID 작업
python
# MERGE (Upsert)
deltaTable = DeltaTable.forPath(spark, "s3://delta-lake/customers/")
deltaTable.alias("target").merge(
updates.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()
# 시간 여행 (Time Travel)
df_yesterday = spark.read.format("delta") .option("versionAsOf", 5) .load("s3://delta-lake/customers/")
# 변경 이력 확인
deltaTable.history().show()
Databricks Jobs & Workflows
python
# Databricks Jobs API를 통한 파이프라인 설정
{
"name": "daily-etl-pipeline",
"tasks": [
{
"task_key": "ingest",
"notebook_task": {"notebook_path": "/ETL/01-ingest"},
"new_cluster": {"spark_version": "14.3.x-scala2.12", "num_workers": 4}
},
{
"task_key": "transform",
"depends_on": [{"task_key": "ingest"}],
"notebook_task": {"notebook_path": "/ETL/02-transform"}
}
],
"schedule": {"quartz_cron_expression": "0 0 2 * * ?"}
}
Unity Catalog는 Databricks의 통합 거버넌스 계층으로, 테이블/뷰/함수의 세분화된 접근 제어와 데이터 계보(Lineage)를 제공한다.