「複数のシステムに散らばったデータを一元管理したい」「経営判断に使えるダッシュボードを構築したい」——このような課題を解決するために必要なのが、データパイプラインとETLの知識です。
本記事では、データパイプラインの基本概念から、ETL/ELTの設計パターン、Python・dbt・Airflowなどのツールを使った実装方法まで、データ基盤構築の実践的なガイドをお届けします。
データパイプラインとETLの基本概念
データパイプラインとは、データを発生源から目的地まで自動的に流す仕組みのことです。業務システム、Webアプリケーション、外部APIなどから生まれるデータを、分析やレポーティングに使える状態に加工して、データウェアハウスやBIツールに届けます。
ETLとは
ETLは、データパイプラインの代表的なパターンで、以下の3つのステップで構成されます。
Extract(抽出)
データソース(データベース、API、ファイルなど)からデータを取り出すステップです。複数のシステムから異なる形式のデータを収集します。
Transform(変換)
抽出したデータを、分析に適した形式に加工するステップです。データのクレンジング(不正値の除去・修正)、フォーマット統一、集計、結合などの処理を行います。
Load(ロード)
変換後のデータを、目的のデータストア(データウェアハウス、データレイクなど)に格納するステップです。
ETLとELTの違い
近年は、ETLに代わってELT(Extract→Load→Transform)が主流になりつつあります。
ETL(従来型)
データをロードする前に変換処理を行います。変換処理はETLツールのサーバー上で実行されます。オンプレミス環境や、データウェアハウスの処理能力が限られている場合に適しています。
ELT(現在の主流)
生データをそのままデータウェアハウスにロードし、変換処理はデータウェアハウス内で実行します。BigQuery、Snowflake、Redshiftなどのクラウドデータウェアハウスは強力な処理能力を持つため、大量データの変換を高速に行えます。
ELTのメリットは以下のとおりです。
・生データが保存されるため、後から変換ロジックを変更できる
・データウェアハウスの処理能力を活用でき、大量データの変換が高速
・ETLツール側のサーバーリソースを節約できる
・dbtなどのツールでSQL中心の変換処理が記述できる
データパイプラインのアーキテクチャ
データパイプラインの全体像を、レイヤー構成で理解しましょう。
データソース層
データの発生源です。典型的なデータソースには以下があります。
・業務データベース:PostgreSQL、MySQLなどの基幹システムDB
・SaaS API:Salesforce、HubSpot、Google Analytics、Stripe
・ファイル:CSV、Excel、JSON、XML
・ストリーミング:IoTセンサー、アプリケーションログ、クリックストリーム
取り込み層(Extract / Load)
データソースからデータを取得し、データウェアハウスにロードする層です。この部分を担うツールとして以下があります。
・Fivetran:300以上のコネクタを持つフルマネージドのデータ統合サービス
・Airbyte:オープンソースのデータ統合ツール。350以上のコネクタに対応
・カスタムスクリプト:PythonでAPIを呼び出してデータを取得する自作パイプライン
データウェアハウス層
データを一元的に蓄積・管理する層です。主要な選択肢は以下のとおりです。
・BigQuery(Google Cloud):サーバーレスで運用不要。従量課金でコストが最適化しやすい
・Snowflake:コンピュートとストレージの分離アーキテクチャ。柔軟なスケーリング
・Amazon Redshift:AWS環境との統合が強み。大規模データの分析に適する
・DuckDB:ローカル実行可能な軽量分析DB。小規模なデータ分析やプロトタイピングに便利
変換層(Transform)
データウェアハウス内でデータを加工する層です。dbt(data build tool)が変換層のデファクトスタンダードになっています。
消費層(BI・分析)
最終的にデータを活用する層です。BIツール(Metabase、Redash、Looker Studio)やデータアプリケーションがこの層に該当します。
Pythonでのデータ抽出(Extract)の実装
Pythonを使ったデータ抽出の基本的な実装パターンを紹介します。
API からのデータ抽出
import requests
import json
from datetime import datetime, timedelta
def extract_from_api(api_url: str, api_key: str, since: str) -> list[dict]:
"""REST APIからデータを抽出する"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
params = {
"updated_since": since,
"per_page": 100,
"page": 1
}
all_records = []
while True:
response = requests.get(api_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
records = data.get("results", [])
if not records:
break
all_records.extend(records)
params["page"] += 1
return all_records
# 使用例:昨日以降に更新されたデータを取得
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
records = extract_from_api(
api_url="https://api.example.com/v1/orders",
api_key="your-api-key",
since=yesterday
)
print(f"取得件数: {len(records)}")
データベースからのデータ抽出
import psycopg2
import pandas as pd
def extract_from_postgres(
connection_string: str,
query: str,
params: dict = None
) -> pd.DataFrame:
"""PostgreSQLからデータを抽出してDataFrameで返す"""
conn = psycopg2.connect(connection_string)
try:
df = pd.read_sql_query(query, conn, params=params)
return df
finally:
conn.close()
# 使用例
df = extract_from_postgres(
connection_string="postgresql://user:pass@host:5432/dbname",
query="""
SELECT order_id, customer_id, amount, status, created_at
FROM orders
WHERE created_at >= %(since)s
""",
params={"since": "2026-03-01"}
)
print(f"取得行数: {len(df)}")
dbtを使ったデータ変換(Transform)
dbt(data build tool)は、SQLベースのデータ変換ツールです。SELECTステートメントを書くだけで、データウェアハウス内のデータ変換パイプラインを構築できます。
dbtの基本概念
モデル
dbtの基本単位はモデル(.sqlファイル)です。1つのモデルが1つのSELECTクエリに対応し、実行結果がテーブルまたはビューとしてデータウェアハウスに作成されます。
レイヤー構成
dbtのプロジェクトは、一般的に以下のレイヤーで構成します。
・staging(ステージング):生データの軽微な加工(型変換、カラム名の統一)
・intermediate(中間):ビジネスロジックの適用、テーブルの結合
・marts(マート):BIツールから参照する最終的な分析テーブル
dbtモデルの実装例
以下は、注文データを加工して月次売上サマリーを作成する例です。
-- models/staging/stg_orders.sql
-- 注文テーブルのステージングモデル
SELECT
id AS order_id,
customer_id,
CAST(amount AS DECIMAL(10, 2)) AS order_amount,
status AS order_status,
CAST(created_at AS TIMESTAMP) AS ordered_at
FROM {{ source('raw', 'orders') }}
WHERE status != 'cancelled'
-- models/marts/monthly_revenue.sql
-- 月次売上サマリーのマートモデル
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
monthly_summary AS (
SELECT
DATE_TRUNC('month', ordered_at) AS month,
COUNT(DISTINCT order_id) AS total_orders,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(order_amount) AS total_revenue,
AVG(order_amount) AS avg_order_value
FROM orders
WHERE order_status = 'completed'
GROUP BY DATE_TRUNC('month', ordered_at)
)
SELECT
month,
total_orders,
unique_customers,
total_revenue,
avg_order_value,
total_revenue - LAG(total_revenue) OVER (ORDER BY month) AS revenue_change
FROM monthly_summary
ORDER BY month DESC
dbtのテスト機能
dbtにはデータ品質を担保するテスト機能が組み込まれています。
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: order_status
tests:
- accepted_values:
values: ['pending', 'completed', 'refunded']
dbt testコマンドを実行すると、定義したテストが自動的に実行され、データ品質の問題を早期に検出できます。
Apache Airflowによるパイプラインのオーケストレーション
データパイプラインが複雑になると、各タスクの実行順序、スケジューリング、エラーハンドリングを管理するオーケストレーションツールが必要になります。Apache Airflowは、最も広く使われているオーケストレーションツールです。
Airflowの基本概念
DAG(Directed Acyclic Graph)
Airflowでは、パイプラインをDAG(有向非巡回グラフ)として定義します。各タスクの依存関係と実行順序をPythonコードで記述します。
Operator
個々のタスクの実行ロジックを定義する部品です。PythonOperator(Python関数の実行)、BashOperator(シェルコマンドの実行)、BigQueryOperator(BigQueryクエリの実行)など、多数のOperatorが用意されています。
AirflowでのDAG定義例
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='daily_sales_pipeline',
default_args=default_args,
description='日次売上データパイプライン',
schedule_interval='0 6 * * *', # 毎朝6時に実行
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract_orders = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders_from_api,
)
extract_customers = PythonOperator(
task_id='extract_customers',
python_callable=extract_customers_from_db,
)
load_to_warehouse = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_data_to_bigquery,
)
run_dbt = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/dbt && dbt run --models marts',
)
run_dbt_tests = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/dbt && dbt test',
)
# タスクの依存関係
[extract_orders, extract_customers] >> load_to_warehouse >> run_dbt >> run_dbt_tests
この例では、注文データと顧客データの抽出を並列で実行し、両方が完了したらデータウェアハウスにロードし、dbtによる変換とテストを順番に実行します。
Airflowの代替ツール
Airflowは高機能ですが、学習コストと運用負荷が高い面もあります。以下の代替ツールも検討しましょう。
・Prefect:Pythonネイティブで直感的なAPI。クラウドマネージド版あり
・Dagster:データアセット中心の設計。型安全性と開発者体験に優れる
・dbt Cloud:dbtの変換パイプラインに特化したマネージドサービス
・GitHub Actions:小規模なパイプラインならCI/CDツールで十分な場合もある
データパイプライン構築のベストプラクティス
データパイプラインの品質と安定性を高めるためのベストプラクティスを紹介します。
冪等性(べきとうせい)の確保
同じパイプラインを複数回実行しても、結果が同じになるように設計します。障害発生時の再実行を安全に行うために不可欠です。
・INSERT文の代わりにMERGE(UPSERT)を使用する
・各実行で「その日のデータを削除→再挿入」のパターンを採用する
・処理済みデータの重複チェックを組み込む
増分ロード(Incremental Loading)
毎回全データを処理するのではなく、前回処理以降に変更されたデータのみを処理する増分ロードを実装します。処理時間とコストを大幅に削減できます。
・updated_atカラムを利用して更新されたレコードを特定する
・CDCを使用してリアルタイムに変更を検出する
・ウォーターマーク(最後に処理した時刻)を管理テーブルに記録する
モニタリングとアラート
パイプラインの健全性を継続的に監視します。
・パイプラインの成功/失敗をSlack通知する
・データの件数や値の範囲を定期的にチェックする(データ品質モニタリング)
・処理時間の推移を記録し、異常な遅延を検知する
・Airflowのログを一元管理し、障害時の原因調査を容易にする
まとめ
データパイプラインとETLは、データ活用の基盤となる技術です。本記事のポイントを整理します。
・ETLはExtract→Transform→Loadの3ステップ、ELTはLoad後にDWH上で変換する現代的な手法
・データソース→取り込み→DWH→変換→消費の5層アーキテクチャで全体を設計する
・PythonでAPIやデータベースからのデータ抽出を実装する
・dbtを使えばSQLでデータ変換パイプラインを構築・テストできる
・Airflowでタスクの依存関係・スケジューリング・エラー処理を管理する
・冪等性の確保、増分ロード、モニタリングがパイプラインの品質を支える
まずは小規模なデータ(1〜2のデータソース)からパイプラインを構築し、運用のノウハウを蓄積することをおすすめします。基本的なパイプラインが安定稼働したら、データソースの追加やdbtによる変換の高度化に段階的に取り組みましょう。
関連記事
AIエージェント開発入門|自律型AIの仕組みと構築方法を解説【2026年版】
AI駆動コーディングワークフロー|Claude Code・Cursor・Copilotの実践的使い分け
プロンプトエンジニアリング上級編|Chain-of-Thought・Few-Shot・ReActの実践
APIレート制限の設計と実装|トークンバケット・スライディングウィンドウ解説
APIバージョニング戦略|URL・ヘッダー・クエリパラメータの使い分け
BIツール入門|Metabase・Redash・Looker Studioでデータ可視化する方法
チャットボット開発入門|LINE Bot・Slack Botの構築方法と活用事例
CI/CDパイプラインの基礎|継続的インテグレーション・デリバリーの全体像