Airflow でDAGを任意のタイミングで一度だけ実行する方法

Airflow で作成したDAGを自動で定期実行せずに、あえて手動実行で一度だけ実行したい場合もある。 DAGのオプションを以下のように設定する。 schedule_interval を “@once” に設定することで、一度だけDAGが実行される is_paused_upon_creation を True に設定することで、DAGが作成時に自動的に実行されず、DAGが停止状態で作成される。 デフォルトではFalseとなっており、自動実行される。 1 2 3 4 5 6 7 8 9 from airflow import DAG with DAG( dag_id="sellerscore_initial_batch", # NOTE: dosen't need to repeat schedule_interval="@once", # NOTE: we have to manually start the this DAG is_paused_upon_creation=True, ) as dag: Reference Airflow: schedule_interval = ‘@once’ Docs - airflow.models.dag

October 12, 2021 Â· Shunya Ueta

CloudComposer のDAGをCircleCIで更新する

Cloud Composer(Airflow) の DAG を GitHub リポジトリで管理して、CI によりリポジトリで管理している DAG を Pull Request がマージされると Cloud Composer の DAG へ同期する方法について説明する。 DAG は、ルートディレクトリ直下の dags/ というディレクトリに格納されている状態を前提とする。 以下の2つのコマンドラインツールを利用して実現できる。 Service Account の認証のために gcloud DAG の同期のために gsutil CircleCI によるワークフローの記述例は以下のとおり 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 version:2.1jobs:rsync-dags:working_directory:~/workspacedocker:- image:gcr.io/google.com/cloudsdktool/cloud-sdk:alpineenvironment:GOOGLE_APPLICATION_CREDENTIALS:/gcp-service-key.jsonsteps:- checkout- run:name:SyncDAGfoldertoGCS'sDAGfoldercommand:| echo "${CLOUD_COMPOSER_CREDENTIALS_JSON}" > ${GOOGLE_APPLICATION_CREDENTIALS}gcloudauthactivate-service-account--key-file${GOOGLE_APPLICATION_CREDENTIALS}gsutil-mrsync-d-rdags\"$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config....

October 4, 2021 Â· Shunya Ueta

GCPのCloud Composer のDAGを素早く・簡単にデバッグする

GCPでAirflow をマネージドサービスで使えるサービスで Cloud Composer が存在する。 BigQueryやBigTable, PubSub などGCPの各サービスをDAGとして定義してジョブを定期実行できるので非常に便利だが、その代わりDAGを実行するまで結果がわからないので、CloudComposer を一度実行するしか無いのでデバッグが困難になる傾向がある。 また、GitHubのリポジトリにDAGを保存して、CIでCloud Composerを更新するようしていると PRを都度作ってマージされないと確認できないという場合もある。 ローカルでDocker で走らせれば良いのじゃないかというツッコミがあると思いますが、結局 Cloud Composer 上での動作を確かめないといけないので、今回の記事を書くことにしました。 NOTE: 自分が使用しているComposerのversionはcomposer-1.8.0-Airflow-1.10.3 です。基本的にやれることは一緒だと思います。また、dev, prodで同一のDAGが走るCloud Composer を運用しているという前提です。 アプローチは2つ logger.info() を仕込んで、DAGのなかで何が起こっているかを理解する 1 2 3 4 5 import logging logger = logging.getLogger(__name__) logger.info() loggerをDAGを記述した Pythonファイルに仕込んで、内部で何が起こっているかを把握する。 各DAGのlogは、 GCPのCloud ComposerのページにアクセスしてAirflow webserver 列のボタンをクリックしてAirflowのWeb applicaiton にログイン 確認したいDAGをクリック DAG内のtask をクリックして表示されるモーダル内の View Logをクリックすると、loggerの情報が確認できる gstuil rsync コマンドでのGCSへのDAGの同期 gstuil rsyncコマンドを使うことで、リポジトリのDAGファイルをGCSに格納されている開発環境上のCloudComposer のDAGファイルに直接同期してPull Request マージ後のDAGの挙動を確認できる。 Cloud Composer のDAGは、自動作成されたGoogle Cloud Storage(GCS)に格納されており、GCSをCloud Composerが定期的に監視してCloud Composerを更新している。 つまり、GCS上のDAGファイルを直接更新してやるとそれがCloud Composerに反映される。 体感として2-3分に一度は監視されているので、ほぼ待ち状態がない状態で確認できて便利です。...

September 29, 2021 Â· Shunya Ueta