CloudComposer の Variables (環境変数)を gcloud cli で取得する

Airflow 1 系で設定されている環境変数を JSON ファイルとして GUI を使って書き出す方法の続報です。 前回、Airflow CLI からでも環境変数を JSON ファイルとして出力できる1が、手元から実行しても GCP 上のインスタンスにしか保存されなかったので諦めたと書きました。 ですが、その問題を解決できたので、解決方法を公開しておきます。 Cloud Storage に格納されるデータ | Cloud Composer | Google Cloudによると、Cloud Composer インスタンス内部のディレクトリは GCS にマッピングされているらしい。 マッピング関係は以下( GCP のドキュメントをそのまま引用) フォルダ Storage パス マッピングされたディレクトリ 説明 DAG gs://bucket-name/dags /home/airflow/gcs/dags 環境の DAG を保存します。このフォルダ内の DAG のみが環境にスケジュールされます。 プラグイン gs://bucket-name/plugins /home/airflow/gcs/plugins カスタム プラグインを保存します。カスタムのインハウス Airflow 演算子、フック、センサー、インターフェースなどです。 データ gs://bucket-name/data /home/airflow/gcs/data タスクが生成して使用するデータを保存します。このフォルダは、すべてのワーカーノードにマウントされます。 ログ gs://bucket-name/logs タスクの Airflow ログを保存します。ログは Airflow ウェブ インターフェースでも利用できます。 それを使えば、/home/airflow/gcs/data にファイルを保存すれば、CloudComposer が保有している GCS の gs://bucket-name/data にアクセスすれば、そのファイルが参照可能になる。...

October 17, 2022 · Shunya Ueta
Airflow で環境変数をJSONファイルとしてお手軽に書き出す方法

Airflow 1系で設定されている環境変数を JSON ファイルとしてGUIを使って書き出す方法

CloudComposer(GCP の Airflow のマネージドサービス)で運用している Airflow 1 系上で設定されている環境変数を JSON ファイルとして書き出したかったが、つまずいたのでメモを公開しておく。 Airflow の運用の理想としては、リポジトリをベースに CI 経由で CloudComposer を構築していくのがベスト。 だが、Airflow では GUI でお手軽に環境変数(Airflow では Variables という概念1)が設定でき、便利な半面、デメリットとしてリポジトリをベースにした Single Source of Truth の状態が保てなくなってしまう。 Airflow の環境変数を JSON ファイルとして書き出す方法 上部の Admin メニューから、Variablesをクリックしてページに移動 With selectedボタンをクリックすると Exportボタンがドロップダウンリスト内にでてくるので、これをクリックすれば Airflow に保存されている環境変数を JSON ファイルとして書き出せる Export できるとは初見でわからなかったのでこの UI を考えた人は罪深い。@naoさんに教えていただけて感謝! Airflow CLI からでも環境変数を JSON ファイルとして出力できるらしい2が、手元から 1 gcloud composer environments run COMPOSER_NAME --location asia-northeast1 variables -- --export env.json を実行してもローカルには保存されなかったので、実行結果は CloudComposer 内部のインスタンスに保存されている模様。...

October 4, 2022 · Shunya Ueta

Airflow でDAGを任意のタイミングで一度だけ実行する方法

Airflow で作成したDAGを自動で定期実行せずに、あえて手動実行で一度だけ実行したい場合もある。 DAGのオプションを以下のように設定する。 schedule_interval を “@once” に設定することで、一度だけDAGが実行される is_paused_upon_creation を True に設定することで、DAGが作成時に自動的に実行されず、DAGが停止状態で作成される。 デフォルトではFalseとなっており、自動実行される。 1 2 3 4 5 6 7 8 9 from airflow import DAG with DAG( dag_id="sellerscore_initial_batch", # NOTE: dosen't need to repeat schedule_interval="@once", # NOTE: we have to manually start the this DAG is_paused_upon_creation=True, ) as dag: Reference Airflow: schedule_interval = ‘@once’ Docs - airflow.models.dag

October 12, 2021 · Shunya Ueta

CloudComposer のDAGをCircleCIで更新する

Cloud Composer(Airflow) の DAG を GitHub リポジトリで管理して、CI によりリポジトリで管理している DAG を Pull Request がマージされると Cloud Composer の DAG へ同期する方法について説明する。 DAG は、ルートディレクトリ直下の dags/ というディレクトリに格納されている状態を前提とする。 以下の2つのコマンドラインツールを利用して実現できる。 Service Account の認証のために gcloud DAG の同期のために gsutil CircleCI によるワークフローの記述例は以下のとおり 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 version:2.1jobs:rsync-dags:working_directory:~/workspacedocker:- image:gcr.io/google.com/cloudsdktool/cloud-sdk:alpineenvironment:GOOGLE_APPLICATION_CREDENTIALS:/gcp-service-key.jsonsteps:- checkout- run:name:SyncDAGfoldertoGCS'sDAGfoldercommand:| echo "${CLOUD_COMPOSER_CREDENTIALS_JSON}" > ${GOOGLE_APPLICATION_CREDENTIALS}gcloudauthactivate-service-account--key-file${GOOGLE_APPLICATION_CREDENTIALS}gsutil-mrsync-d-rdags\"$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config....

October 4, 2021 · Shunya Ueta

GCPのCloud Composer のDAGを素早く・簡単にデバッグする

GCPでAirflow をマネージドサービスで使えるサービスで Cloud Composer が存在する。 BigQueryやBigTable, PubSub などGCPの各サービスをDAGとして定義してジョブを定期実行できるので非常に便利だが、その代わりDAGを実行するまで結果がわからないので、CloudComposer を一度実行するしか無いのでデバッグが困難になる傾向がある。 また、GitHubのリポジトリにDAGを保存して、CIでCloud Composerを更新するようしていると PRを都度作ってマージされないと確認できないという場合もある。 ローカルでDocker で走らせれば良いのじゃないかというツッコミがあると思いますが、結局 Cloud Composer 上での動作を確かめないといけないので、今回の記事を書くことにしました。 NOTE: 自分が使用しているComposerのversionはcomposer-1.8.0-Airflow-1.10.3 です。基本的にやれることは一緒だと思います。また、dev, prodで同一のDAGが走るCloud Composer を運用しているという前提です。 アプローチは2つ logger.info() を仕込んで、DAGのなかで何が起こっているかを理解する 1 2 3 4 5 import logging logger = logging.getLogger(__name__) logger.info() loggerをDAGを記述した Pythonファイルに仕込んで、内部で何が起こっているかを把握する。 各DAGのlogは、 GCPのCloud ComposerのページにアクセスしてAirflow webserver 列のボタンをクリックしてAirflowのWeb applicaiton にログイン 確認したいDAGをクリック DAG内のtask をクリックして表示されるモーダル内の View Logをクリックすると、loggerの情報が確認できる gstuil rsync コマンドでのGCSへのDAGの同期 gstuil rsyncコマンドを使うことで、リポジトリのDAGファイルをGCSに格納されている開発環境上のCloudComposer のDAGファイルに直接同期してPull Request マージ後のDAGの挙動を確認できる。 Cloud Composer のDAGは、自動作成されたGoogle Cloud Storage(GCS)に格納されており、GCSをCloud Composerが定期的に監視してCloud Composerを更新している。 つまり、GCS上のDAGファイルを直接更新してやるとそれがCloud Composerに反映される。 体感として2-3分に一度は監視されているので、ほぼ待ち状態がない状態で確認できて便利です。...

September 29, 2021 · Shunya Ueta