bqutil.fn.typeof() の実行結果

OSS の Google BigQuery UDF `bqutil.fn` を使えば UDF の独自実装を置き換えられるかもしれない

TL;DR; UDF を独自実装する前に、bqutil.fnを眺めておくと車輪の再発明が回避できるかも 背景 SQL は、特定の処理を行う際にデータの型が同一でないとエラーが発生しますが、もとのスキーマを紹介するよりももっとお手軽にカラムの型を確認したいときがありませんか? 例えば、出力結果を見ただけでは、12345 が STRING なのか INT64 なのか判別不可能ですよね。(もし判別可能な方法知っている人いたら教えて下さい…) GCP による OSS UDF の bqutil.fn なのでお手軽に BigQuery の結果の型を確認したい時になにか良い方法がないかなと調べていたら、OSS でbqutil.fnという UDF が GCP から提供されていた。 例えば型の確認の場合、以下の ユーザー定義関数(UDF) はどの GCP プロジェクトから実行しても実行可能 1 bqutil.fn.typeof() このbqutil.fn はbigquery-utils/udfs/community/のディレクトリに格納されている UDF がbqutil という GCP プロジェクトのfn データセットに同期されているので、どの GCP プロジェクトの Google BigQuery から実行しても bqutil.fn.typeof()を実行可能にしているらしい。 頭良い This directory contains community contributed user-defined functions to extend BigQuery for more specialized usage patterns. Each UDF within this directory will be automatically synchronized to the bqutil project within the fn dataset for reference in queries....

January 20, 2022 Â· Shunya Ueta

Google Cloud Pub/Sub に公開された結果はDataflow template を使えばめちゃくちゃ簡単に確認できる

PubSub に出力された結果を確認するのって、なかなか手間がかかりませんか? 最近同僚に簡単な確認方法を教えてもらい、感動したのでそれを記事にしました。 確認方法 PubSub のメッセージを出力する Google Cloud Storage bucket を同一 GCP プロジェクトで作成する。 GCP の Pub/Sub ページに移動する 確認したい Pub/Sub topic をクリックする ページ下部にある CREATE SUBSCRIPTION ボタンを押すと選択肢で、Create subscription, Export to BigQuery, Export to Cloud Storageがあり、 Export to Cloud Storageを選択する。 BigQuery、 Google Cloud Storage への吐き出しを行い際に、自動的に subscription が生成される。 Export to Cloud Storage を選択すると、Text 形式か Abro 形式での出力にするかを選択できる。基本的には簡単に確認できる Text 形式を選ぶと良さげ。 選択後、下記のような設定画面が出てくるので情報を埋めていく。基本的には、どこの Google Cloud Storage に出力するかを埋めれば完了。 10m ほどすると Streaming job の Dataflow の起動が完了して、一定期間ごとに Pub/Sub の topic に公開されたデータがテキスト形式で出力され始めます。 出力された GCS の結果を眺めるには、 gsutil コマンドなどを使うのが簡単です。自分はgsutil cat の結果をコピーして VS Code で確認しています。 Cloud Dataflow のテンプレート機能については、端的に説明すると、GUI でパラメータを設定するだけで、Dataflow によるデータ処理が簡単に実行できるようになる機能です。...

November 5, 2021 Â· Shunya Ueta

CloudComposer のDAGをCircleCIで更新する

Cloud Composer(Airflow) の DAG を GitHub リポジトリで管理して、CI によりリポジトリで管理している DAG を Pull Request がマージされると Cloud Composer の DAG へ同期する方法について説明する。 DAG は、ルートディレクトリ直下の dags/ というディレクトリに格納されている状態を前提とする。 以下の2つのコマンドラインツールを利用して実現できる。 Service Account の認証のために gcloud DAG の同期のために gsutil CircleCI によるワークフローの記述例は以下のとおり 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 version:2.1jobs:rsync-dags:working_directory:~/workspacedocker:- image:gcr.io/google.com/cloudsdktool/cloud-sdk:alpineenvironment:GOOGLE_APPLICATION_CREDENTIALS:/gcp-service-key.jsonsteps:- checkout- run:name:SyncDAGfoldertoGCS'sDAGfoldercommand:| echo "${CLOUD_COMPOSER_CREDENTIALS_JSON}" > ${GOOGLE_APPLICATION_CREDENTIALS}gcloudauthactivate-service-account--key-file${GOOGLE_APPLICATION_CREDENTIALS}gsutil-mrsync-d-rdags\"$(gcloud composer environments describe {COMPOSER_NAME} --project={GCP_PROJECT} --location={REGION} --format="get(config....

October 4, 2021 Â· Shunya Ueta

GCPのCloud Composer のDAGを素早く・簡単にデバッグする

GCPでAirflow をマネージドサービスで使えるサービスで Cloud Composer が存在する。 BigQueryやBigTable, PubSub などGCPの各サービスをDAGとして定義してジョブを定期実行できるので非常に便利だが、その代わりDAGを実行するまで結果がわからないので、CloudComposer を一度実行するしか無いのでデバッグが困難になる傾向がある。 また、GitHubのリポジトリにDAGを保存して、CIでCloud Composerを更新するようしていると PRを都度作ってマージされないと確認できないという場合もある。 ローカルでDocker で走らせれば良いのじゃないかというツッコミがあると思いますが、結局 Cloud Composer 上での動作を確かめないといけないので、今回の記事を書くことにしました。 NOTE: 自分が使用しているComposerのversionはcomposer-1.8.0-Airflow-1.10.3 です。基本的にやれることは一緒だと思います。また、dev, prodで同一のDAGが走るCloud Composer を運用しているという前提です。 アプローチは2つ logger.info() を仕込んで、DAGのなかで何が起こっているかを理解する 1 2 3 4 5 import logging logger = logging.getLogger(__name__) logger.info() loggerをDAGを記述した Pythonファイルに仕込んで、内部で何が起こっているかを把握する。 各DAGのlogは、 GCPのCloud ComposerのページにアクセスしてAirflow webserver 列のボタンをクリックしてAirflowのWeb applicaiton にログイン 確認したいDAGをクリック DAG内のtask をクリックして表示されるモーダル内の View Logをクリックすると、loggerの情報が確認できる gstuil rsync コマンドでのGCSへのDAGの同期 gstuil rsyncコマンドを使うことで、リポジトリのDAGファイルをGCSに格納されている開発環境上のCloudComposer のDAGファイルに直接同期してPull Request マージ後のDAGの挙動を確認できる。 Cloud Composer のDAGは、自動作成されたGoogle Cloud Storage(GCS)に格納されており、GCSをCloud Composerが定期的に監視してCloud Composerを更新している。 つまり、GCS上のDAGファイルを直接更新してやるとそれがCloud Composerに反映される。 体感として2-3分に一度は監視されているので、ほぼ待ち状態がない状態で確認できて便利です。...

September 29, 2021 Â· Shunya Ueta

gcloud commands で PubSub に jsonファイルをメッセージとして公開 (Pusblish) する

gcloud commands で PubSub に json ファイルをメッセージとして公開 (Pusblish) する jq コマンドが必要になるが、一番簡単に実現できるのは 1 $ gcloud pubsub topics publish {PUBSUB_TOPIC_NAME} --message "$(cat {FILE_NAME} | jq -c)" jq コマンドの -c オプションは compact-output を意味している。デフォルトだと pretty-prints になってしまう。 それを避けるために-cオプションを使用している。 ref Publishing messages to topics Read a txt file JSON data to publish the messages in Cloud Pub Sub

September 7, 2021 Â· Shunya Ueta