如何使用 Python 與 PostgreSQL 建立 API 整合資料管線





如何使用 Python 與 PostgreSQL 建立 API 整合資料管線

在現今以資料為核心的世界裡,從各類 API 擷取、儲存與分析資料,是資料工程師與分析師的日常任務之一。
無論是即時氣象資訊、社群媒體動態、IoT 感測器數據,或金融 API,建立高效的
資料管線(Data Pipeline)
不僅能節省大量手動處理時間,也能確保資料的一致性與可重現性。

本篇教學將帶你一步步學會如何:

  • 使用 requests 擷取 API 資料
  • 轉換 JSON 結構為可儲存格式
  • 將資料載入 PostgreSQL 資料庫
  • 自動化整個資料管線流程
  • 查詢並分析儲存的資料

步驟一:環境設定

請先確認你的系統中已安裝 PythonPostgreSQL,接著安裝必要的 Python 套件:

pip install requests psycopg2 pandas
  • requests:用於發送 HTTP 請求以擷取 API 資料。
  • psycopg2:Python 的 PostgreSQL 連線驅動。
  • pandas:資料清理與轉換的利器。

建立 PostgreSQL 資料庫

在 PostgreSQL 中建立一個新的資料庫:

CREATE DATABASE api_data;

步驟二:從 API 擷取資料

這裡以 Open-Meteo API 為例,擷取逐小時氣溫資料:


import requests

# 定義 API 端點
api_url = "https://api.open-meteo.com/v1/forecast"
params = {
    "latitude": 35.6895,
    "longitude": 139.6917,
    "hourly": "temperature_2m"
}

# 發送 GET 請求
response = requests.get(api_url, params=params)

if response.status_code == 200:
    data = response.json()
    print("資料擷取成功!")
else:
    print("資料擷取失敗:", response.status_code)
小提示: 執行任何請求後,務必檢查 response.status_code,避免管線中斷或錯誤。

步驟三:轉換資料以便儲存

API 通常回傳 JSON 格式資料,接著可轉換為結構化的 Pandas DataFrame:


import pandas as pd

# 取出氣溫與時間戳
temperature_data = data['hourly']['temperature_2m']
timestamps = data['hourly']['time']

# 建立 DataFrame
df = pd.DataFrame({'timestamp': timestamps, 'temperature': temperature_data})

# 轉換時間格式
df['timestamp'] = pd.to_datetime(df['timestamp'])
print(df.head())

步驟四:在 PostgreSQL 建立資料表

建立資料表以儲存轉換後的資料:


CREATE TABLE weather_data (
    id SERIAL PRIMARY KEY,
    timestamp TIMESTAMP,
    temperature FLOAT
);

步驟五:將資料載入 PostgreSQL

將 DataFrame 內容逐列寫入資料庫中:


import psycopg2

# 連線到 PostgreSQL
conn = psycopg2.connect(
    dbname="api_data", user="your_user", password="your_password", host="localhost"
)
cur = conn.cursor()

# 逐列插入資料
for _, row in df.iterrows():
    cur.execute(
        "INSERT INTO weather_data (timestamp, temperature) VALUES (%s, %s)",
        (row['timestamp'], row['temperature'])
    )

# 提交並關閉連線
conn.commit()
cur.close()
conn.close()
print("資料已成功寫入 PostgreSQL!")

步驟六:自動化資料管線

你可以透過排程工具自動化整個資料擷取與載入流程。

在 Linux/Mac 使用 Cron


crontab -e
0 0 * * * /usr/bin/python3 /path/to/data_pipeline.py

在 Windows 使用工作排程器

  • 開啟 Task Scheduler
  • 建立新工作並設定觸發時間
  • 設定動作:python C:\path\to\data_pipeline.py
小提示: 建議使用 Python 的 logging 模組來記錄自動化管線的執行狀況。

步驟七:查詢資料進行分析


conn = psycopg2.connect(
    dbname="api_data", user="your_user", password="your_password", host="localhost"
)
cur = conn.cursor()

cur.execute("SELECT * FROM weather_data WHERE timestamp > NOW() - INTERVAL '1 day'")
rows = cur.fetchall()

for row in rows:
    print(row)

cur.close()
conn.close()

結語

透過本篇教學,你已學會如何使用 Python 與 PostgreSQL 建立完整的 API 資料管線。
從擷取、轉換、儲存、排程到分析,你現在具備了打造自動化資料流程的基礎能力,
未來可以進一步整合如 Airflow 或 Prefect 等工具,讓資料管線更強大與可擴展。




发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注