Cloud Data FusionのCDAP REST APIでパイプラインを作成し実行する方法

こんにちは、香田です。

Cloud Data Fusionではパイプライン等をHTTPで作成、管理できるCDAP REST APIが提供されています。

Cloud Data FusionはCDAPを利用したサービスなので、CDAPて提供されているAPIが同じように使用可能です。

今回、バッチ パイプラインをAPI経由で作成し実行する流れを紹介していこうかと思います。

はじめに

利用したパイプラインの内容は、GCSからWranglerで変換しBigQueryにロードするバッチパイプラインとなります。

cdap1

また以降の手順で指定しているインスタンス名、パイプライン名などは下記で進めていきます。

項目内容
Cloud Data Fusion インスタンス名sandbox
Cloud Data Fusion ロケーションus-west1
ネームスペース名sandbox
パイプライン名gcs-to-bq
パイプライン設定ファイル名gcs-to-bq.json

接続情報や認証情報を環境変数に設定します。

認証情報を環境変数を設定

export AUTH_TOKEN=$(gcloud auth print-access-token)

Cloud Data Fusion 接続情報を環境変数を設定

export INSTANCE_ID=sandbox
export LOCATION=us-west1
export CDAP_ENDPOINT=$(gcloud beta data-fusion instances describe --format="value(apiEndpoint)" --location=${LOCATION} ${INSTANCE_ID})

バージョン確認

Cloud Data Fusionのバージョンを確認します。

curl -X GET "${CDAP_ENDPOINT}/v3/version" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

Namespaceを作成

Namespaceを作成します。

curl -X PUT "${CDAP_ENDPOINT}/v3/namespaces/sandbox" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

作成したNamespaceを確認します。

curl -X GET "${CDAP_ENDPOINT}/v3/namespaces" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

パイプラインの作成

Batch Pipelineを作成します。実行後にDeploy Completeと表示されれば成功です。

curl -X PUT "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq" \
-d @gcs-to-bq.json \
-H "Authorization: Bearer ${AUTH_TOKEN}"

Cloud Data Fusion ウェブUIより対象のパイプラインを確認すると、下記のように[Status]が[Deployed]状態になっていることが確認できます。

cdf1

パイプラインの実行

次にパイプラインを実行します。

curl -X POST "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/start" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

パイプラインのステータス確認

パイプラインのステータスは下記で確認できます。

curl -X GET "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/status" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

パイプラインのログ確認

パイプラインのログは下記のようにUnixtimeを指定し確認できます。

curl -X GET "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq/workflows/DataPipelineWorkflow/logs?start=1587724167&stop=1587724488" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

パイプラインの削除

不要になったらパイプラインは削除できます。

curl -X DELETE "${CDAP_ENDPOINT}/v3/namespaces/sandbox/apps/gcs-to-bq" \
-H "Authorization: Bearer ${AUTH_TOKEN}"

さいごに

CDAP REST APIを使用してCloud Data Fusionのパイプライン作成は如何でしたでしょうか。

CDAP REST APIを利用すれば、CI/CDのパイプラインも組み立てられそうですね。

下記は今回参考にしたドキュメントなので、よかったら参考にしてみてください。

最後までご覧頂きありがとうございます!

SNSでもご購読できます。