使用 Apache Airflow 與 Python 自動化 ETL 流程
前言
在今日的資料驅動環境中,企業需要穩健的資料管線來支援即時與準即時分析。
然而,傳統依賴手動撰寫腳本的 ETL 流程,不但缺乏彈性,也容易出錯。
自動化已成為資料工程的核心能力。
Apache Airflow 是由 Airbnb 開發的開源工作流程編排工具,如今已成為 ETL 自動化的事實標準。
它與 Python 搭配後,能提供高度可擴展、可程式化且易管理的 ETL 工作流程。
為何要自動化 ETL 流程?
- 可擴展性:自動化讓管線能應對資料成長與多來源資料整合。
- 可靠性:減少手動錯誤,流程更加穩定。
- 監控能力:提供日誌、指標、錯誤追蹤與告警。
- 可重現性:版本化 ETL 流程,可重跑與稽核。
- 現代基礎架構整合:可與雲端、資料湖、分散式計算整合。
認識 Apache Airflow
Airflow 是一套以程式碼撰寫工作流程的工具,強調可視化、模組化與可擴展性。
核心概念
- DAG(有向無環圖):定義工作流程中任務的依賴與順序。
- Task:流程中的一個步驟,例如執行 Python、SQL 或 API。
- Operators:操作器,負責執行特定功能。
- Scheduler:排程 DAG、觸發任務。
- Executor:負責執行任務的後端(Local、Celery、Kubernetes)。
- Web UI:可視化 DAG、任務狀態與日誌。
Airflow 架構(簡述)
- Web Server:提供 UI。
- Scheduler:決定任務何時執行。
- Executor:實際執行任務。
- Metadata DB:儲存狀態、XCom、日誌索引。
使用 Airflow 與 Python 建立 ETL
ETL 包含三個階段:
- Extract — 擷取資料
- Transform — 清洗轉換
- Load — 載入目的地
建立基本 DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def extract():
print("Extracting data...")
def transform():
print("Transforming data...")
def load():
print("Loading data...")
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 1, 1),
'retries': 1,
}
dag = DAG(
dag_id='etl_pipeline_example',
default_args=default_args,
schedule_interval='@daily',
)
extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)
extract_task >> transform_task >> load_task
Airflow 架構(詳細說明)
- Web Server:顯示 DAG、任務狀態、日誌。
- Scheduler:排程與提交任務。
- Executor:負責執行任務(Celery / Kubernetes 建議用於生產)。
- Metadata DB:儲存 DAG 版本、任務狀態、XCom 資料與審計紀錄。
實作範例:每日氣象資料自動化
假設你需要每日將天氣資料擷取並寫入資料庫,可設計如下流程:
- Extract:從 Weather API 擷取資料
- Transform:清洗 JSON、統一欄位
- Load:寫入 PostgreSQL
import requests
def extract_weather():
response = requests.get(
"https://api.weatherapi.com/v1/current.json?key=API_KEY&q=London"
)
response.raise_for_status()
return response.json()
def transform_weather(payload):
return {
'city': payload.get('location', {}).get('name'),
'temp_c': payload.get('current', {}).get('temp_c'),
'condition': payload.get('current', {}).get('condition', {}).get('text')
}
def load_to_postgres(record, conn_uri):
print(f"Would insert {record} into {conn_uri}")
Airflow 進階功能
TaskFlow API
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval='@daily', start_date=datetime(2025, 1, 1), catchup=False)
def weather_etl():
@task
def extract():
return {"temp": 20, "city": "London"}
@task
def transform(data):
data["temp_celsius"] = data["temp"]
return data
@task
def load(data):
print(f"Loading data: {data}")
data = extract()
transformed = transform(data)
load(transformed)
weather_etl()
Sensors / Hooks / XComs
Airflow 支援等待外部條件(Sensors)、快速整合外部系統(Hooks)、任務之間資料傳遞(XCom)。
大型資料建議使用物件存儲,而非 XCom。
監控與告警
Airflow 能與 Prometheus、Grafana、Slack、Email、PagerDuty 整合,用於告警與 SLA 管控。
ETL 自動化最佳實踐
- 模組化任務:保持任務獨立可測試。
- 參數化:使用 Variables、Connections、Jinja 模板。
- 重試策略:設定 retries 與 backoff。
- 版本控制:DAG 放入 Git、搭配 CI/CD。
- 本機測試:利用 Docker Compose 測試 DAG。
- 管理機密:使用 Secrets Backend。
- 選對 Executor:生產建議 Celery 或 Kubernetes。
- 資料品質控制:加入 Great Expectations。
Airflow 與其他 ETL 工具比較
- Apache NiFi:可視化資料流,擅長即時處理。
- Luigi:較簡單,但功能不如 Airflow 完整。
- Talend:企業級 GUI 工具,轉換能力強。
- Prefect:比 Airflow 更現代化的編排框架。
Airflow 的 Python-first 與可擴展特性,使其在複雜批次 ETL 管線中脫穎而出。
案例研究:高韌性 ETL 管線
某全球企業採用 Talend 搭配 Airflow 架設高度韌性的 ETL 系統,達成:
- 自動重試與錯誤修復
- 詳細 lineage 與觀察性
- 與 Great Expectations 整合的資料品質控管
ETL 架構設計考量
- 資料量:大量資料時可使用 Spark、Dask。
- 延遲需求:即時 ETL 可整合 Kafka。
- 雲端整合:Airflow 支援 S3、BigQuery、Azure Lake。
- 資料品質:必須加入驗證步驟。
未來趨勢
- 宣告式管線:YAML、SQL 形式的管線定義。
- AI/ML 整合:監控模型漂移、自動 retrain。
- Serverless ETL:以 AWS Lambda、GCP Cloud Run 執行。
- 資料可觀察性:更強的 metadata 追蹤能力。
- Example-driven ETL:研究引導式 ETL 產生。
結論
使用 Apache Airflow 與 Python 自動化 ETL 工作流程,是現代資料工程的核心能力。
Airflow 讓資料團隊能快速構建、監控並維護可擴展與高可靠性的資料管線。
無論是每日氣象資料、金融交易處理,還是機器學習模型輸入,
Airflow 都能提供彈性且可信賴的基礎建設。







