Python ETL 入門專案:自動化 CSV 匯入資料庫


Python ETL 入門專案:自動化 CSV 匯入資料庫

示例圖片

為什麼 ETL 很重要

資料是現代企業的核心。原始資料通常散落在 CSV 檔、API、試算表或日誌中。為了有效使用資料進行分析、機器學習或報表製作,我們需要將資料整合到結構化環境,例如關聯式資料庫。
這個 Extract, Transform, Load (ETL) 流程可以將零散、不規則的資料轉換成可靠且可查詢的資料集。
自動化 CSV 匯入通常是初學者第一個實作 ETL 專案,因為 CSV 格式在金融、電子商務等行業依然廣泛使用。

專案概述

本專案將設計一個 ETL 流程,從 CSV 檔提取原始資料,進行清理與增強,並匯入資料庫表格。
我們將使用 Python 實作,搭配 pandasSQLAlchemy
最後,我們將自動化整個流程,使新 CSV 檔能以最少人工干預完成處理。

環境設定

在撰寫程式前,請建立乾淨的 Python 環境並安裝必要套件。本教學使用 PostgreSQL,但方法可應用於 MySQL、SQLite 或其他關聯式資料庫。


# 建立虛擬環境 (建議使用)
python3 -m venv etl_env
source etl_env/bin/activate   # Windows: etl_env\Scripts\activate

# 安裝套件
pip install pandas sqlalchemy psycopg2

其中 pandas 用於資料處理,SQLAlchemy 提供 ORM 介面,psycopg2 是 PostgreSQL 驅動程式。依資料庫需求可更換驅動。

Extract 步驟:讀取 CSV

Extract 步驟主要是從 CSV 檔讀取原始資料。一個 CSV 代表一個資料集,或需處理資料夾中多個檔案。
使用 pandas 讀取 CSV 十分方便。


import pandas as pd

# 讀取單一 CSV 檔
df = pd.read_csv("sales_data.csv")

# 預覽資料
print(df.head())

若要處理多個 CSV:


import os
import pandas as pd

dataframes = []
for file in os.listdir("data/"):
    if file.endswith(".csv"):
        path = os.path.join("data/", file)
        temp_df = pd.read_csv(path)
        dataframes.append(temp_df)

df = pd.concat(dataframes, ignore_index=True)

Transform 步驟:清理與結構化資料

Transform 步驟將 CSV 原始資料轉換為可靠資訊。常見操作包括:

  • 標準化欄位名稱
  • 處理缺失值
  • 轉換資料型別(如字串轉日期)
  • 移除重複列
  • 衍生新欄位

# 標準化欄位名稱
df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]

# 處理缺失值
df["revenue"] = df["revenue"].fillna(0)

# 轉換日期字串為 datetime
df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")

# 移除重複列
df = df.drop_duplicates()

# 衍生新欄位: profit
df["profit"] = df["revenue"] - df["cost"]

Load 步驟:寫入資料庫

將轉換後的資料寫入資料庫。SQLAlchemy 提供統一介面。


from sqlalchemy import create_engine

# 使用實際資料庫帳號密碼
engine = create_engine("postgresql+psycopg2://user:password@localhost:5432/etl_db")

# 匯入資料
df.to_sql("sales", engine, if_exists="append", index=False)

自動化流程

開發階段可手動執行,但生產系統需自動化。可將 ETL 步驟整合成 Python 腳本,並使用 cron 或 Windows Task Scheduler 排程。


# etl_pipeline.py
def run_etl():
    df = extract_csv("data/")
    df = transform_data(df)
    load_to_db(df)

if __name__ == "__main__":
    run_etl()

每日午夜執行的 cron 範例:


0 0 * * * /usr/bin/python3 /path/to/etl_pipeline.py >> etl.log 2>&1

錯誤處理與日誌紀錄


import logging

logging.basicConfig(
    filename="etl.log",
    level=logging.INFO,
    format="%(asctime)s %(levelname)s:%(message)s"
)

try:
    run_etl()
    logging.info("ETL 流程執行成功。")
except Exception as e:
    logging.error(f"ETL 流程失敗: {e}")

效能考量

大檔 CSV 可能達 GB 級別,建議:

  • 使用 chunksize 分批處理 CSV
  • 批次插入資料庫
  • 對常查詢欄位建立索引
  • 壓縮 CSV (gzip)

for chunk in pd.read_csv("large_file.csv", chunksize=100000):
    transform_chunk = transform_data(chunk)
    transform_chunk.to_sql("sales", engine, if_exists="append", index=False)

可擴展性與延伸

  • 支援更多資料格式 (JSON, Excel, API, Streaming)
  • 整合雲端儲存 (AWS S3, Google Cloud Storage, Azure Blob)
  • 使用分散式系統處理 (Spark, Dask)
  • 導入資料驗證框架 (Great Expectations, Pandera)
  • 實施 CI/CD 部署 ETL
示例圖片

最佳實務總結

  • 將 Extract、Transform、Load 分成獨立函式
  • 及早標準化與驗證資料
  • 使用日誌方便排錯與監控
  • 使用 cron 或工作流管理器自動化
  • 考慮可擴展性,CSV 可能演進為 API 或串流資料




发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注