CloudComposer の Variables (環境変数)を gcloud cli で取得する

Airflow 1 系で設定されている環境変数を JSON ファイルとして GUI を使って書き出す方法の続報です。 前回、Airflow CLI からでも環境変数を JSON ファイルとして出力できる1が、手元から実行しても GCP 上のインスタンスにしか保存されなかったので諦めたと書きました。 ですが、その問題を解決できたので、解決方法を公開しておきます。 Cloud Storage に格納されるデータ | Cloud Composer | Google Cloudによると、Cloud Composer インスタンス内部のディレクトリは GCS にマッピングされているらしい。 マッピング関係は以下( GCP のドキュメントをそのまま引用) フォルダ Storage パス マッピングされたディレクトリ 説明 DAG gs://bucket-name/dags /home/airflow/gcs/dags 環境の DAG を保存します。このフォルダ内の DAG のみが環境にスケジュールされます。 プラグイン gs://bucket-name/plugins /home/airflow/gcs/plugins カスタム プラグインを保存します。カスタムのインハウス Airflow 演算子、フック、センサー、インターフェースなどです。 データ gs://bucket-name/data /home/airflow/gcs/data タスクが生成して使用するデータを保存します。このフォルダは、すべてのワーカーノードにマウントされます。 ログ gs://bucket-name/logs タスクの Airflow ログを保存します。ログは Airflow ウェブ インターフェースでも利用できます。 それを使えば、/home/airflow/gcs/data にファイルを保存すれば、CloudComposer が保有している GCS の gs://bucket-name/data にアクセスすれば、そのファイルが参照可能になる。...

October 17, 2022 · Shunya Ueta
Airflow で環境変数をJSONファイルとしてお手軽に書き出す方法

Airflow 1系で設定されている環境変数を JSON ファイルとしてGUIを使って書き出す方法

CloudComposer(GCP の Airflow のマネージドサービス)で運用している Airflow 1 系上で設定されている環境変数を JSON ファイルとして書き出したかったが、つまずいたのでメモを公開しておく。 Airflow の運用の理想としては、リポジトリをベースに CI 経由で CloudComposer を構築していくのがベスト。 だが、Airflow では GUI でお手軽に環境変数(Airflow では Variables という概念1)が設定でき、便利な半面、デメリットとしてリポジトリをベースにした Single Source of Truth の状態が保てなくなってしまう。 Airflow の環境変数を JSON ファイルとして書き出す方法 上部の Admin メニューから、Variablesをクリックしてページに移動 With selectedボタンをクリックすると Exportボタンがドロップダウンリスト内にでてくるので、これをクリックすれば Airflow に保存されている環境変数を JSON ファイルとして書き出せる Export できるとは初見でわからなかったのでこの UI を考えた人は罪深い。@naoさんに教えていただけて感謝! Airflow CLI からでも環境変数を JSON ファイルとして出力できるらしい2が、手元から 1 gcloud composer environments run COMPOSER_NAME --location asia-northeast1 variables -- --export env.json を実行してもローカルには保存されなかったので、実行結果は CloudComposer 内部のインスタンスに保存されている模様。...

October 4, 2022 · Shunya Ueta
bqutil.fn.typeof() の実行結果

OSS の Google BigQuery UDF `bqutil.fn` を使えば UDF の独自実装を置き換えられるかもしれない

TL;DR; UDF を独自実装する前に、bqutil.fnを眺めておくと車輪の再発明が回避できるかも 背景 SQL は、特定の処理を行う際にデータの型が同一でないとエラーが発生しますが、もとのスキーマを紹介するよりももっとお手軽にカラムの型を確認したいときがありませんか? 例えば、出力結果を見ただけでは、12345 が STRING なのか INT64 なのか判別不可能ですよね。(もし判別可能な方法知っている人いたら教えて下さい…) GCP による OSS UDF の bqutil.fn なのでお手軽に BigQuery の結果の型を確認したい時になにか良い方法がないかなと調べていたら、OSS でbqutil.fnという UDF が GCP から提供されていた。 例えば型の確認の場合、以下の ユーザー定義関数(UDF) はどの GCP プロジェクトから実行しても実行可能 1 bqutil.fn.typeof() このbqutil.fn はbigquery-utils/udfs/community/のディレクトリに格納されている UDF がbqutil という GCP プロジェクトのfn データセットに同期されているので、どの GCP プロジェクトの Google BigQuery から実行しても bqutil.fn.typeof()を実行可能にしているらしい。 頭良い This directory contains community contributed user-defined functions to extend BigQuery for more specialized usage patterns. Each UDF within this directory will be automatically synchronized to the bqutil project within the fn dataset for reference in queries....

January 20, 2022 · Shunya Ueta

Google Cloud Pub/Sub に公開された結果はDataflow template を使えばめちゃくちゃ簡単に確認できる

PubSub に出力された結果を確認するのって、なかなか手間がかかりませんか? 最近同僚に簡単な確認方法を教えてもらい、感動したのでそれを記事にしました。 確認方法 PubSub のメッセージを出力する Google Cloud Storage bucket を同一 GCP プロジェクトで作成する。 GCP の Pub/Sub ページに移動する 確認したい Pub/Sub topic をクリックする ページ下部にある CREATE SUBSCRIPTION ボタンを押すと選択肢で、Create subscription, Export to BigQuery, Export to Cloud Storageがあり、 Export to Cloud Storageを選択する。 BigQuery、 Google Cloud Storage への吐き出しを行い際に、自動的に subscription が生成される。 Export to Cloud Storage を選択すると、Text 形式か Abro 形式での出力にするかを選択できる。基本的には簡単に確認できる Text 形式を選ぶと良さげ。 選択後、下記のような設定画面が出てくるので情報を埋めていく。基本的には、どこの Google Cloud Storage に出力するかを埋めれば完了。 10m ほどすると Streaming job の Dataflow の起動が完了して、一定期間ごとに Pub/Sub の topic に公開されたデータがテキスト形式で出力され始めます。 出力された GCS の結果を眺めるには、 gsutil コマンドなどを使うのが簡単です。自分はgsutil cat の結果をコピーして VS Code で確認しています。 Cloud Dataflow のテンプレート機能については、端的に説明すると、GUI でパラメータを設定するだけで、Dataflow によるデータ処理が簡単に実行できるようになる機能です。...

November 5, 2021 · Shunya Ueta

CloudComposer のDAGをCircleCIで更新する

Cloud Composer(Airflow) の DAG を GitHub リポジトリで管理して、CI によりリポジトリで管理している DAG を Pull Request がマージされると Cloud Composer の DAG へ同期する方法について説明する。 DAG は、ルートディレクトリ直下の dags/ というディレクトリに格納されている状態を前提とする。 以下の2つのコマンドラインツールを利用して実現できる。 Service Account の認証のために gcloud DAG の同期のために gsutil CircleCI によるワークフローの記述例は以下のとおり 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 version:2.1jobs:rsync-dags:working_directory:~/workspacedocker:- image:gcr.io/google.com/cloudsdktool/cloud-sdk:alpineenvironment:GOOGLE_APPLICATION_CREDENTIALS:/gcp-service-key.jsonsteps:- checkout- run:name:SyncDAGfoldertoGCS'sDAGfoldercommand:| echo "${CLOUD_COMPOSER_CREDENTIALS_JSON}" > ${GOOGLE_APPLICATION_CREDENTIALS}gcloudauthactivate-service-account--key-file${GOOGLE_APPLICATION_CREDENTIALS}gsutil-mrsync-d-rdags\"$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config....

October 4, 2021 · Shunya Ueta