Prefect:Pythonスクリプトをそのまま本番データパイプラインに昇格させるワークフロー管理フレームワーク
bash ファイル名.sh を実行してください(中身を一度確認してから実行すると安心です)。
(macOS / Linux 環境が必要) Prefect:Pythonスクリプトをそのまま本番データパイプラインに昇格させるワークフロー管理フレームワーク
ひとことでいうと
Prefect(プリフェクト)は、Pythonの関数に2種類の目印(デコレーター)を付けるだけで、そのスクリプトを「本番で動くデータパイプライン(データを自動で処理する仕組み)」に変えられるツールです。スケジュール実行・失敗したときの自動リトライ・処理結果のキャッシュ保存・イベントをきっかけにした起動など、プロダクション運用に欠かせない機能がすぐに使えるようになります。自分のサーバーで動かす「Prefect Server」と、PrefectHQ社が運営するクラウドサービス「Prefect Cloud」のどちらも選べるため、チームの規模やセキュリティ方針に合わせて柔軟に導入できます。月に2億回以上のタスクを処理する大手企業の現場でも採用されており、個人の学習用から大規模なエンタープライズ環境まで幅広く対応しています。
こんな人におすすめ
1. 既存のPythonスクリプトをそのままパイプラインにしたいデータエンジニア
AirflowのようなXMLやDAG(有向非循環グラフ:処理の流れを図で表す形式)の定義ファイルを書くのに疲れた方に最適です。「いつものPythonの書き方で、複雑なパイプラインをそのまま組みたい」という方も、既存スクリプトに数行追加するだけで移行できます。
2. MLパイプラインを信頼性高く運用したい機械学習エンジニア
モデルの学習・評価・デプロイといった複数ステップを、失敗したときの自動リトライや状態管理付きで運用できます。「新しいデータが届いたら自動で学習を開始する」といったイベント駆動型の使い方にも対応しています。
3. 社内でバッチ処理をセルフホスト運用したいバックエンド・インフラエンジニア
定期的なバッチ処理・API連携・レポート生成などを、外部のSaaS(インターネット越しに使うソフトウェアサービス)に依存せず社内環境で完結させたい方に向いています。Prefect Serverを社内サーバーに立てれば、データを外部に出さずに運用できます。
インストール・使い方
Prefectを使うには Python 3.10以上が必要です。ターミナル(文字を打ち込んでパソコンに命令を送る画面)を開いて、以下の手順を進めてください。コマンドはコピー&ペーストで構いません。
Step 1: Prefectをインストールする
pip install -U prefect
pip(ピップ)はPythonのパッケージ(追加機能のまとまり)をインストールするためのコマンドです。-U は「最新バージョンに更新する」という意味で付けておくと安心です。高速なパッケージ管理ツール uv(ユーブイ)を使っている場合は、次のコマンドでも構いません。
uv add prefect
Step 2: 最小サンプルを動かしてみる
以下のコードを my_flow.py というファイルに保存して実行してみましょう。@task(アット・タスク)が個々の処理ステップを、@flow(アット・フロー)がそれらをまとめるワークフロー全体を表します。
from prefect import flow, task
import httpx
@task(log_prints=True)
def get_stars(repo: str):
url = f"https://api.github.com/repos/{repo}"
count = httpx.get(url).json()["stargazers_count"]
print(f"{repo} has {count} stars!")
@flow(name="GitHub Stars")
def github_stars(repos: list[str]):
for repo in repos:
get_stars(repo)
if __name__ == "__main__":
github_stars(["PrefectHQ/prefect"])
python my_flow.py
GitHubの指定リポジトリ(ソースコードの置き場所)のスター数を取得して表示するフローです。実行すると、Prefectがタスクの開始・完了を自動でログ(記録)に残してくれます。
Step 3: Web UIでログを確認する
prefect server start
このコマンドでローカル(自分のパソコン)にPrefectの管理サーバーが起動します。ブラウザで http://localhost:4200 を開くと、実行済みフローのログ・ステータス・タイムラインをグラフィカルに確認できます。「どのタスクが何秒かかったか」「どこで失敗したか」が一目でわかります。
Step 4: スケジュール実行(デプロイメント)に変換する
スクリプトの最後の部分を次のように書き換えると、定期実行する「デプロイメント(本番への登録)」に昇格できます。
if __name__ == "__main__":
github_stars.serve(
name="first-deployment",
cron="* * * * *", # 毎分実行
parameters={"repos": ["PrefectHQ/prefect"]}
)
cron(クーロン)はLinuxなどで使われるスケジュール書式です。"* * * * *" は「毎分実行」を意味します。このプロセスを起動したままにしておくと、Prefect Serverがスケジュールを検知して自動実行します。UIやCLI(コマンドラインの操作画面)からの手動実行も可能です。
動かしてみた
実際の環境で動作を確認しました。まず押さえておくと安心なのが、インストール方法によって挙動が異なるという点です。ソースコードから直接ビルドする方法では、内部でGitコマンドが必要になる場面があります。一方、PyPI経由の pip install -U prefect を使えばこの問題は発生しません。公式でも推奨しているこの方法で進めると、スムーズに導入できます。
動作確認した環境では Python 3.12.13 が動いており、Prefectの推奨スペック(Python 3.10以上)を問題なく満たしていました。
リポジトリの構成も確認しました。ui/ フォルダにはReact + Vite + TypeScriptで作られたフロントエンド(ブラウザ画面)が含まれており、schemas/ には設定ファイルのJSONスキーマ、benches/ にはベンチマーク(性能計測)、load_testing/ には負荷試験スクリプトが入っています。エンタープライズ規模の利用を最初から想定した作りになっていることが分かります。
試す前に知っておくとよいこと
- Python 3.10以上が必要です(3.12でも動作確認済み)
- インストールはPyPI経由(
pip install prefect)が最もシンプルで確実です - Web UIを使う場合はポート4200が空いていることを確認してください
- ソースコードからビルドする場合は
gitコマンドが別途必要です
ブラウザで試す
Prefectフローのコードテンプレートをインタラクティブに生成できるデモが公式から提供されています。タスク名・リトライ回数・スケジュール(cron式)を入力すると、そのまま使えるPythonコードが出力されます。インストール前に使い感を確かめたい場合や、cron式の書き方に迷ったときに便利です。
はじめの一歩(実践のコツ)
すぐに試せる行動を順番にまとめます。難しいところから始めなくて大丈夫です。
- まず動かす:
pip install prefectでインストールし、Step 2のサンプルをコピー&ペーストしてそのまま実行してみる - リトライを追加する:
@task(retries=3, retry_delay_seconds=5)と書くだけで、失敗したタスクを3回まで5秒おきに自動で再実行できるようになる - UIでログを見る:
prefect server startを実行してブラウザでhttp://localhost:4200を開くと、どのタスクが成功・失敗・スキップされたかが一目でわかる - スケジュール実行に変換する:
.serve()にcron=を追加するだけで、定期実行デプロイメントに昇格できる - インテグレーションを活用する:
prefect-slackなどの公式パッケージを追加すると、処理完了・失敗をSlackに自動通知できる - チームで使う: 慣れてきたらPrefect Cloudに接続すると、実行履歴やアラートをチームで共有できる
活用例
- ETL(データ抽出・変換・ロード)の自動化: データベースからデータを取り出し、加工してから別のシステムに格納する一連の処理を
@taskで定義する。失敗したステップだけを自動でリトライするため、全体を最初からやり直すコストを大幅に削減できます。 - MLパイプラインの管理: データ前処理・モデル学習・精度評価・モデルレジストリへの登録を一連のフローとして記述する。各ステップの「成功・失敗・スキップ」を細かく追跡できるため、どこで問題が起きたかすぐに特定できます。
- 定期レポートの自動生成: 毎朝8時に売上データを集計してCSVに出力し、Slackに通知するフローをcron設定で自動化する。
prefect-slackなどのインテグレーションパッケージと組み合わせることで、通知連携も数行で実装できます。 - API連携バッチ処理: 外部APIから定期的にデータを取得して社内データベースに格納する処理を、スケジュール管理付きで安定運用できます。一時的なAPIエラーが発生してもリトライが自動で動くため、監視の手間が減ります。
- データ品質チェックの自動化: 新しいデータが届いたタイミングをトリガーにして、データの件数・型・欠損値をチェックするフローを自動起動できます。問題が見つかれば即座に通知する仕組みと組み合わせて活用できます。
- 個人開発・学習: 小規模なスクレイピング(Web上の情報収集)・通知ボット・定期集計スクリプトをPrefectで管理することで、本番環境での運用ノウハウ(ログ確認・リトライ設計)を実務と同じ感覚で身に付けられます。
用語とポイント解説
Flow(フロー)
@flow デコレーターを付けたPythonの関数のことです。ワークフロー全体の「入れ物」にあたり、Prefectはこの単位で実行履歴やログを管理します。かんたんに言うと、「複数の処理ステップをひとまとめにした大きな作業単位」です。フロー同士を入れ子にすることもできます。
Task(タスク)
@task デコレーターを付けた関数で、フローの中の個々の処理ステップです。タスク単位でリトライ・キャッシュ・ログが個別に制御されます。かんたんに言うと、「フローの中の一つひとつの仕事」です。失敗箇所の特定がしやすくなり、問題の切り分けが容易になります。
Deployment(デプロイメント)
フローをスケジュール・パラメータ付きでサーバーに登録したものです。.serve() メソッドを呼ぶだけで作成できます。かんたんに言うと、「フローを定期自動実行できる状態に登録すること」です。一度登録すれば、UIやCLIから管理・手動実行もできます。
State(ステート)
各フロー・タスクの実行状態を表す情報です。Completed(完了)・Failed(失敗)・Retrying(リトライ中)などの状態が記録されます。かんたんに言うと、「今どの処理がどうなっているかを示すラベル」です。Web UIでグラフィカルに確認できます。
Prefect Server(プリフェクト・サーバー)
自分のサーバーに立てるオーケストレーションサーバー(処理の司令塔)と、ブラウザで操作できるWeb UIのセットです。prefect server start コマンドで起動します。かんたんに言うと、「フローの管理画面を自社内で動かす仕組み」です。外部にデータを出さずに運用できます。
Prefect Cloud(プリフェクト・クラウド)
PrefectHQ社が運営するマネージドサービス(サーバー管理を業者が代わりにやってくれるサービス)版です。かんたんに言うと、「インフラ管理を任せてフローの開発だけに集中できるクラウド版」です。チームでの共有・アラート設定・RBAC(権限管理)が容易になります。
デコレーター(Decorator)
Pythonの関数の直前に @ マークで書く修飾子のことです。関数の中身を変えずに、ログ記録・リトライ・キャッシュなどの機能を後付けできます。かんたんに言うと、「関数に特別な能力をあとから追加するための書き方」です。Prefectでは @flow と @task の2種類を使います。
cron(クーロン)
Linuxなどで使われるスケジュール記述書式です。「分・時・日・月・曜日」の5つの値またはワイルドカード(*)で実行タイミングを指定します。かんたんに言うと、「“毎日朝8時”や”毎分”を記号で書く方法」です。例えば 0 8 * * * は「毎日8:00に実行」を意味します。
versioningit(バージョニングイット)
Gitのタグ情報からPythonパッケージのバージョン番号を自動生成するツールです。Prefectのソースコードからビルドする際に内部で使われています。かんたんに言うと、「ソースから手動ビルドするときに、バージョン番号を自動で決めてくれる仕組み」です。PyPI経由でインストールする場合は意識する必要はありません。
Prefectは、シンプルなスクリプトから始めてチーム規模のパイプライン運用まで、同じコードベースで段階的に成長させられるのが最大の強みです。ぜひ定期バッチ処理の自動化やMLパイプラインの管理などに活用してみてはいかがでしょうか。