こんにちは、香田です。
Cloud Data Fusionではパイプライン等をHTTPで作成、管理できるCDAP REST APIが提供されています。
Cloud Data FusionはCDAPを利用したサービスなので、CDAPて提供されているAPIが同じように使用可能です。
今回、バッチ パイプラインをAPI経由で作成し実行する流れを紹介していこうかと思います。
はじめに
利用したパイプラインの内容は、GCSからWranglerで変換しBigQueryにロードするバッチパイプラインとなります。
また以降の手順で指定しているインスタンス名、パイプライン名などは下記で進めていきます。
項目 | 内容 |
---|---|
Cloud Data Fusion インスタンス名 | sandbox |
Cloud Data Fusion ロケーション | us-west1 |
ネームスペース名 | sandbox |
パイプライン名 | gcs-to-bq |
パイプライン設定ファイル名 | gcs-to-bq.json |
接続情報や認証情報を環境変数に設定します。
認証情報を環境変数を設定
export AUTH_TOKEN=$(gcloud auth print-access-token)
Cloud Data Fusion 接続情報を環境変数を設定
export INSTANCE_ID=sandbox
export LOCATION=us-west1
export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe --format="value(apiEndpoint)" --location=${LOCATION} ${INSTANCE_ID})
バージョン確認
Cloud Data Fusionのバージョンを確認します。
curl -X GET "${CDAP_ENDPOINT}/v3/version" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
Namespaceを作成
Namespaceを作成します。
curl -X PUT "${CDAP_ENDPOINT}/v3/namespaces/sandbox" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
作成したNamespaceを確認します。
curl -X GET "${CDAP_ENDPOINT}/v3/namespaces" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
パイプラインの作成
Batch Pipelineを作成します。実行後にDeploy Complete
と表示されれば成功です。
curl -X PUT "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq" \
-d @gcs-to-bq.json \
-H "Authorization: Bearer ${AUTH_TOKEN}"
Cloud Data Fusion ウェブUIより対象のパイプラインを確認すると、下記のように[Status]が[Deployed]状態になっていることが確認できます。
パイプラインの実行
次にパイプラインを実行します。
curl -X POST "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/start" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
パイプラインのステータス確認
パイプラインのステータスは下記で確認できます。
curl -X GET "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/status" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
パイプラインのログ確認
パイプラインのログは下記のようにUnixtimeを指定し確認できます。
curl -X GET "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/logs?start=1587724167&stop=1587724488" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
パイプラインの削除
不要になったらパイプラインは削除できます。
curl -X DELETE "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq" \
-H "Authorization: Bearer ${AUTH_TOKEN}"
さいごに
CDAP REST APIを使用してCloud Data Fusionのパイプライン作成は如何でしたでしょうか。
CDAP REST APIを利用すれば、CI/CDのパイプラインも組み立てられそうですね。
下記は今回参考にしたドキュメントなので、よかったら参考にしてみてください。
最後までご覧頂きありがとうございます!