GCPのCloud Composer のDAGを素早く・簡単にデバッグする
GCPでAirflow をマネージドサービスで使えるサービスで Cloud Composer が存在する。 BigQueryやBigTable, PubSub などGCPの各サービスをDAGとして定義してジョブを定期実行できるので非常に便利だが、その代わりDAGを実行するまで結果がわからないので、CloudComposer を一度実行するしか無いのでデバッグが困難になる傾向がある。
また、GitHubのリポジトリにDAGを保存して、CIでCloud Composerを更新するようしていると PRを都度作ってマージされないと確認できないという場合もある。
ローカルでDocker で走らせれば良いのじゃないかというツッコミがあると思いますが、結局 Cloud Composer 上での動作を確かめないといけないので、今回の記事を書くことにしました。
NOTE: 自分が使用しているComposerのversionはcomposer-1.8.0-Airflow-1.10.3
です。基本的にやれることは一緒だと思います。また、dev
, prod
で同一のDAGが走るCloud Composer を運用しているという前提です。
アプローチは2つ
logger.info() を仕込んで、DAGのなかで何が起こっているかを理解する
import logging
logger = logging.getLogger(__name__)
logger.info()
loggerをDAGを記述した Pythonファイルに仕込んで、内部で何が起こっているかを把握する。
各DAGのlogは、
- GCPのCloud Composerのページにアクセスして
Airflow webserver
列のボタンをクリックしてAirflowのWeb applicaiton にログイン - 確認したいDAGをクリック
- DAG内のtask をクリックして表示されるモーダル内の
View Log
をクリックすると、loggerの情報が確認できる
gstuil rsync
コマンドでのGCSへのDAGの同期
gstuil rsync
コマンドを使うことで、リポジトリのDAGファイルをGCSに格納されている開発環境上のCloudComposer のDAGファイルに直接同期してPull Request マージ後のDAGの挙動を確認できる。
Cloud Composer のDAGは、自動作成されたGoogle Cloud Storage(GCS)に格納されており、GCSをCloud Composerが定期的に監視してCloud Composerを更新している。 つまり、GCS上のDAGファイルを直接更新してやるとそれがCloud Composerに反映される。 体感として2-3分に一度は監視されているので、ほぼ待ち状態がない状態で確認できて便利です。
以下のコマンドでリポジトリのDAGファイルをGCSに反映させます。
gsutil -m rsync -d -r dags "$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config.dagGcsPrefix)")"
{XXX} には使用する環境の情報を置換してください。
"$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config.dagGcsPrefix)")"
- 指定したGCP Project で動くCloud Composer のDAGが格納されているGCSのパスを取得できる。
gsutil -m rsync -d -r dags
-m
は並列処理-d
は元のディレクトリに存在しないファイルがコピー先にあれば削除(ミラーリング)。これにより、GCS上でDAGを新たに作成して、デバッグしていたとしても、CIが走ればリポジトリにないDAGファイルは削除され、リポジトリのDAGと完全に同期される。-r
はディレクトリとしてコピー- 上記のオプションにより
dags
ディレクトリのDAGファイルをGCSにミラーリングで同期を行う。
Composer のための正規のコマンドはあるが…
また、以下のように gcloud composer environments storage dags import
コマンドで更新する方法もあるので、そちらを使っても大丈夫です。
実行内容自体はGCSのファイルを変更するのと変わりません。
ですが、ディレクトリを対象にしたファイルの同期には対応していないので、上記で説明したコマンドのほうが遥かに楽です。
gcloud composer environments storage dags import \
--environment {ENVIRONMENT_NAME} \
--location {LOCATION} \
--source {LOCAL_FILE_TO_UPLOAD}
NOTE: プロダクションのDAGを直接書き換えるのは危険なのでやめましょう。
Reference
関連しているかもしれない記事
- gcloud commands で Pub/Sub に jsonファイルをメッセージとして公開 (Pusblish) する
- GKE 上にて Pythonで logger.info() を行うとCloud logging では stderr に保存され、すべてエラーになる問題への対処法
- 遅すぎる `pandas.read_gbq` を使わずに、Google BigQueryから高速にデータを読み込む
- How to connect the Google Compute Engine via Visual Studio Code
📮 📧 🐏: 記事への感想のおたよりをおまちしてます。 お気軽にお送りください。 メールアドレス入力があればメールで返信させていただきます。 もちろんお返事を希望せずに単なる感想だけでも大歓迎です。
このサイトの更新情報をRSSで配信しています。 お好きなフィードリーダーで購読してみてください。
このウェブサイトの運営や著者の活動を支援していただける方を募集しています。 もしよろしければ、Buy Me a Coffee からサポート(投げ銭)していただけると、著者の活動のモチベーションに繋がります✨
Amazonでほしいものリストも公開しているので、こちらからもサポートしていただけると励みになります。