Python ETL 入門專案:自動化 CSV 匯入資料庫
目錄
為什麼 ETL 很重要
資料是現代企業的核心。原始資料通常散落在 CSV 檔、API、試算表或日誌中。為了有效使用資料進行分析、機器學習或報表製作,我們需要將資料整合到結構化環境,例如關聯式資料庫。
這個 Extract, Transform, Load (ETL) 流程可以將零散、不規則的資料轉換成可靠且可查詢的資料集。
自動化 CSV 匯入通常是初學者第一個實作 ETL 專案,因為 CSV 格式在金融、電子商務等行業依然廣泛使用。
專案概述
本專案將設計一個 ETL 流程,從 CSV 檔提取原始資料,進行清理與增強,並匯入資料庫表格。
我們將使用 Python 實作,搭配 pandas 與 SQLAlchemy。
最後,我們將自動化整個流程,使新 CSV 檔能以最少人工干預完成處理。
環境設定
在撰寫程式前,請建立乾淨的 Python 環境並安裝必要套件。本教學使用 PostgreSQL,但方法可應用於 MySQL、SQLite 或其他關聯式資料庫。
# 建立虛擬環境 (建議使用)
python3 -m venv etl_env
source etl_env/bin/activate # Windows: etl_env\Scripts\activate
# 安裝套件
pip install pandas sqlalchemy psycopg2
其中 pandas 用於資料處理,SQLAlchemy 提供 ORM 介面,psycopg2 是 PostgreSQL 驅動程式。依資料庫需求可更換驅動。
Extract 步驟:讀取 CSV
Extract 步驟主要是從 CSV 檔讀取原始資料。一個 CSV 代表一個資料集,或需處理資料夾中多個檔案。
使用 pandas 讀取 CSV 十分方便。
import pandas as pd
# 讀取單一 CSV 檔
df = pd.read_csv("sales_data.csv")
# 預覽資料
print(df.head())
若要處理多個 CSV:
import os
import pandas as pd
dataframes = []
for file in os.listdir("data/"):
if file.endswith(".csv"):
path = os.path.join("data/", file)
temp_df = pd.read_csv(path)
dataframes.append(temp_df)
df = pd.concat(dataframes, ignore_index=True)
Transform 步驟:清理與結構化資料
Transform 步驟將 CSV 原始資料轉換為可靠資訊。常見操作包括:
- 標準化欄位名稱
- 處理缺失值
- 轉換資料型別(如字串轉日期)
- 移除重複列
- 衍生新欄位
# 標準化欄位名稱
df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
# 處理缺失值
df["revenue"] = df["revenue"].fillna(0)
# 轉換日期字串為 datetime
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
# 移除重複列
df = df.drop_duplicates()
# 衍生新欄位: profit
df["profit"] = df["revenue"] - df["cost"]
Load 步驟:寫入資料庫
將轉換後的資料寫入資料庫。SQLAlchemy 提供統一介面。
from sqlalchemy import create_engine
# 使用實際資料庫帳號密碼
engine = create_engine("postgresql+psycopg2://user:password@localhost:5432/etl_db")
# 匯入資料
df.to_sql("sales", engine, if_exists="append", index=False)
自動化流程
開發階段可手動執行,但生產系統需自動化。可將 ETL 步驟整合成 Python 腳本,並使用 cron 或 Windows Task Scheduler 排程。
# etl_pipeline.py
def run_etl():
df = extract_csv("data/")
df = transform_data(df)
load_to_db(df)
if __name__ == "__main__":
run_etl()
每日午夜執行的 cron 範例:
0 0 * * * /usr/bin/python3 /path/to/etl_pipeline.py >> etl.log 2>&1
錯誤處理與日誌紀錄
import logging
logging.basicConfig(
filename="etl.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s:%(message)s"
)
try:
run_etl()
logging.info("ETL 流程執行成功。")
except Exception as e:
logging.error(f"ETL 流程失敗: {e}")
效能考量
大檔 CSV 可能達 GB 級別,建議:
- 使用
chunksize分批處理 CSV - 批次插入資料庫
- 對常查詢欄位建立索引
- 壓縮 CSV (gzip)
for chunk in pd.read_csv("large_file.csv", chunksize=100000):
transform_chunk = transform_data(chunk)
transform_chunk.to_sql("sales", engine, if_exists="append", index=False)
可擴展性與延伸
- 支援更多資料格式 (JSON, Excel, API, Streaming)
- 整合雲端儲存 (AWS S3, Google Cloud Storage, Azure Blob)
- 使用分散式系統處理 (Spark, Dask)
- 導入資料驗證框架 (Great Expectations, Pandera)
- 實施 CI/CD 部署 ETL
最佳實務總結
- 將 Extract、Transform、Load 分成獨立函式
- 及早標準化與驗證資料
- 使用日誌方便排錯與監控
- 使用 cron 或工作流管理器自動化
- 考慮可擴展性,CSV 可能演進為 API 或串流資料







