使用 Python 進行批次資料處理:合併與正規化資料集
目錄
為何批次資料處理如此重要
在今日這個資料驅動的世界中,企業每天都在從各種來源產生大量資料——銷售報表、行銷分析、感測器日誌、使用者行為記錄等。然而,這些資料通常分散於不同檔案、資料庫或系統中,使得直接進行分析變得困難。
批次資料處理讓我們能高效地讀取、合併、轉換並標準化這些資料,整合成統一的格式。這是商業智慧、預測建模以及機器學習流程中不可或缺的一步。
設定你的 Python 環境
在開始批次處理之前,讓我們先確保安裝了合適的工具。Python 的 Pandas 是資料操作的核心,而 NumPy 與 Scikit-learn 則用於資料轉換與正規化。
pip install pandas numpy scikit-learn pyarrow
這些套件的用途如下:
- Pandas:用於讀取、合併與轉換資料集。
- NumPy:高效處理大型數值陣列。
- Scikit-learn:資料正規化與前處理。
- PyArrow:支援高效能的 Parquet 資料格式。
高效讀取多個資料集
假設我們有多個月度銷售報表,例如 sales_2023_01.csv、sales_2023_02.csv 等。
import pandas as pd
import glob
# 取得所有符合條件的 CSV 檔案
csv_files = glob.glob("data/sales_*.csv")
# 將每個 CSV 檔案讀入 DataFrame
dfs = [pd.read_csv(file) for file in csv_files]
print(f"成功載入 {len(dfs)} 個資料集。")
如果資料來自不同格式,Pandas 也支援 Excel、JSON、SQL、Parquet 等格式。
合併資料集
在載入多個資料集後,下一步就是將它們合併為一個單一的 DataFrame。方法取決於資料集是否具有相同結構。
垂直合併
df = pd.concat(dfs, ignore_index=True)
處理不一致的欄位名稱
rename_map = {
"Date": "date",
"Sales": "sales",
"Product": "product"
}
dfs = [df.rename(columns=rename_map) for df in dfs]
df = pd.concat(dfs, ignore_index=True)
水平合併
有時資料集代表相同實體的不同層面,例如要合併 sales 與 customer 資料:
df = pd.merge(sales_df, customer_df, on="customer_id", how="left")
資料正規化與清理
正規化能確保資料一致性,為後續分析與機器學習做好準備。
處理缺失值
df = df.fillna({"sales": 0})
df = df.dropna(subset=["product"])
標準化日期格式
df["date"] = pd.to_datetime(df["date"], errors="coerce")
清理分類變數
df["product"] = df["product"].str.lower().str.strip()
Min-Max 正規化
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df["sales_normalized"] = scaler.fit_transform(df[["sales"]])
效能優化策略
當資料量龐大時,效能就成為關鍵。以下是幾個實用技巧:
- 使用 Parquet 取代 CSV — 更快的讀寫速度。
df.to_parquet("combined.parquet", engine="pyarrow")
- 分批讀取大型 CSV
for chunk in pd.read_csv("bigdata.csv", chunksize=500000):
process(chunk)
- 使用替代資料框架:如
Dask或Polars處理 TB 級資料。
端到端的管線範例
以下是一個完整範例,將上述所有步驟整合成實際流程:
import pandas as pd
import glob
from sklearn.preprocessing import MinMaxScaler
# 1. 讀取多個 CSV
csv_files = glob.glob("data/sales_*.csv")
dfs = [pd.read_csv(file) for file in csv_files]
# 2. 合併資料集
df = pd.concat(dfs, ignore_index=True)
# 3. 清理與正規化
df = df.fillna({"sales": 0})
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["product"] = df["product"].str.lower().str.strip()
scaler = MinMaxScaler()
df["sales_normalized"] = scaler.fit_transform(df[["sales"]])
# 4. 儲存處理後的資料
df.to_parquet("processed_sales.parquet", engine="pyarrow")
進階技巧與最佳實踐
- 使用 pandera 等結構驗證工具,確保資料架構一致。
- 應用 模糊比對 處理不一致的欄位名稱。
- 利用 多核心平行處理 加速 ETL 流程。
- 與 Apache Airflow 或 Prefect 整合,打造生產級批次流程。
遵循這些最佳實踐,你就能建立出穩定、高效且可擴展的資料處理管線,為分析與機器學習奠定良好基礎。


