使用 Python 進行批次資料處理:合併與正規化資料集

使用 Python 進行批次資料處理:合併與正規化資料集

目錄

為何批次資料處理如此重要

在今日這個資料驅動的世界中,企業每天都在從各種來源產生大量資料——銷售報表、行銷分析、感測器日誌、使用者行為記錄等。然而,這些資料通常分散於不同檔案、資料庫或系統中,使得直接進行分析變得困難。

批次資料處理讓我們能高效地讀取、合併、轉換並標準化這些資料,整合成統一的格式。這是商業智慧、預測建模以及機器學習流程中不可或缺的一步。

設定你的 Python 環境

在開始批次處理之前,讓我們先確保安裝了合適的工具。Python 的 Pandas 是資料操作的核心,而 NumPyScikit-learn 則用於資料轉換與正規化。


pip install pandas numpy scikit-learn pyarrow

這些套件的用途如下:

  • Pandas:用於讀取、合併與轉換資料集。
  • NumPy:高效處理大型數值陣列。
  • Scikit-learn:資料正規化與前處理。
  • PyArrow:支援高效能的 Parquet 資料格式。

高效讀取多個資料集

假設我們有多個月度銷售報表,例如 sales_2023_01.csvsales_2023_02.csv 等。


import pandas as pd
import glob

# 取得所有符合條件的 CSV 檔案
csv_files = glob.glob("data/sales_*.csv")

# 將每個 CSV 檔案讀入 DataFrame
dfs = [pd.read_csv(file) for file in csv_files]

print(f"成功載入 {len(dfs)} 個資料集。")

如果資料來自不同格式,Pandas 也支援 ExcelJSONSQLParquet 等格式。

合併資料集

在載入多個資料集後,下一步就是將它們合併為一個單一的 DataFrame。方法取決於資料集是否具有相同結構。

垂直合併


df = pd.concat(dfs, ignore_index=True)

處理不一致的欄位名稱


rename_map = {
    "Date": "date",
    "Sales": "sales",
    "Product": "product"
}
dfs = [df.rename(columns=rename_map) for df in dfs]
df = pd.concat(dfs, ignore_index=True)

水平合併

有時資料集代表相同實體的不同層面,例如要合併 salescustomer 資料:


df = pd.merge(sales_df, customer_df, on="customer_id", how="left")

資料正規化與清理

正規化能確保資料一致性,為後續分析與機器學習做好準備。

處理缺失值


df = df.fillna({"sales": 0})
df = df.dropna(subset=["product"])

標準化日期格式


df["date"] = pd.to_datetime(df["date"], errors="coerce")

清理分類變數


df["product"] = df["product"].str.lower().str.strip()

Min-Max 正規化


from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
df["sales_normalized"] = scaler.fit_transform(df[["sales"]])

效能優化策略

當資料量龐大時,效能就成為關鍵。以下是幾個實用技巧:

  • 使用 Parquet 取代 CSV — 更快的讀寫速度。

df.to_parquet("combined.parquet", engine="pyarrow")
  • 分批讀取大型 CSV

for chunk in pd.read_csv("bigdata.csv", chunksize=500000):
    process(chunk)
  • 使用替代資料框架:如 DaskPolars 處理 TB 級資料。

端到端的管線範例

以下是一個完整範例,將上述所有步驟整合成實際流程:


import pandas as pd
import glob
from sklearn.preprocessing import MinMaxScaler

# 1. 讀取多個 CSV
csv_files = glob.glob("data/sales_*.csv")
dfs = [pd.read_csv(file) for file in csv_files]

# 2. 合併資料集
df = pd.concat(dfs, ignore_index=True)

# 3. 清理與正規化
df = df.fillna({"sales": 0})
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df["product"] = df["product"].str.lower().str.strip()

scaler = MinMaxScaler()
df["sales_normalized"] = scaler.fit_transform(df[["sales"]])

# 4. 儲存處理後的資料
df.to_parquet("processed_sales.parquet", engine="pyarrow")

進階技巧與最佳實踐

  • 使用 pandera 等結構驗證工具,確保資料架構一致。
  • 應用 模糊比對 處理不一致的欄位名稱。
  • 利用 多核心平行處理 加速 ETL 流程。
  • Apache AirflowPrefect 整合,打造生產級批次流程。

遵循這些最佳實踐,你就能建立出穩定、高效且可擴展的資料處理管線,為分析與機器學習奠定良好基礎。




发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注