
こんにちは。香田です。
今回はローカル環境でCDAP Sandboxにてパイプラインを作成後、
GCPのCloud Data Fusionにパイプラインをインポートして実行する流れを解説していきます。
はじめに
はじめにCDAPについて紹介しておくと、
CDAPはデータ分析で必要なETLおよびELTのパイプラインを効率的に構築できるオープンソースで提供されているデータ統合サービスです。
GCPのCloud Data FusionはCDAPを利用したフルマネージドのクラウドネイティブなエンタープライズデータ統合サービスになります。
CDAPではスケーラブルで高可用性な構成を提供する場合、Hadoopプラットフォームが必要ですが開発用途として提供されているCDAP Sandboxを利用すると単一のマシンで検証可能となります。
また、CDAPで作成したパイプラインはCloud Data Fusionと互換性があるので、パイプラインの開発環境としてCDAP Sandboxを利用することが可能です。
CDAP Sandboxをローカルでセットアップ
CDAP Sandbox docker imageをインストールします。
$ docker pull caskdata/cdap-sandbox:6.1.1
CDAP Sandboxを起動します。
$ docker run -d --name cdap-sandbox -p 11011:11011 -p 11015:11015 caskdata/cdap-sandbox:6.1.1
CDAP UIに接続します。
$ open http://localhost:11011
下記のような画面が表示されたらOKです

GCP サービスアカウントキーの作成
CDAP SandboxにてGCPのリソースを参照できるようサービスアカウントキーを作成します。
GCP [IAMと管理]の[サービスアカウント]より[サービス アカウントの作成]を選択します。
[サービスアカウント名]を入力し作成をクリックします。

ここではロールに[BigQuery管理者]と[ストレージ管理者]を設定しています。
問題なければ[続行]をクリックします。

[キーを作成]より[JSON]を選択し[作成]をクリックします。
ダウンロードされたJSONファイルは以降の手順で使用します。

ダウンロードしたサービスアカウントキーをCDAP Sandboxへコピーします。
$ docker cp <service-account-key>.json cdap-sandbox:/opt/cdap/sandbox/
Wranglerで変換処理の設定
CDAP UIへアクセスし[Wrangler]をクリックします。

[Add Connection]より[Google Cloud Storage]を選択します

[Connection name]、[Project ID]、[Service account key file location]を入力し[Test Connection]にて問題なければ[Add Connection]をクリックします。

対象のGCSバケットよりファイルを選択します。

ここでは[Wrangler]にて下記のような変換処理を設定しています。
問題なければ[Create Pipeline]から[Batch pipeline]をクリックします。

パイプライン作成
[Studio]の画面が表示されるので、パイプラインを作成していきます。

[Sink]の[BigQuery]を選択し、[Wrangler]から矢印を[BigQuery]へドラッグします。

[BigQuery]の[Properties]をクリックし、[Project ID]、[Dataset]、[Table]、[Service Account File Path]を入力します。

パイプラインの名前を入力後[Save]し、[Deploy]をクリックします。
[Run]にてパイプライン実行後、[Status]が[Succeeded]になっていれば成功です。

パイプラインのエクスポート
パイプラインをGCPのCloud Data Fusionでも利用できるようにプロパティを一部修正します。
[Actions]より[Duplicate]しパイプラインをコピーします。

[GCS]の[Properties]をクリックし[Service Account File Path]を[auto-detect]に変更しておきます。

同じく[BigQuery]の[Properties]をクリックし[Service Account File Path]を[auto-detect]に変更しておきます。

[Export]を実行すると、パイプライン設定のJSONファイルがダウンロード可能となります。

Cloud Data Fusionを作成
GCP [Data Fusion]の[インスタンスを作成]より、[インスタンス名]を設定し[作成]をクリックします。

Data Fusionのパイプラインへインポート
Data FusionのUIにて[Pipeline]の[List]より[Import]をクリックします。
CDAP SandboxよりエクスポートしたJSONファイルを選択します。

インポートする際に下記の用にアップデート画面がでた場合、[Fix All]をクリックします。

[Deploy]をクリックします。

[Run]にてパイプライン実行後、[Status]が[Succeeded]になっていれば成功です。

さいごに
CDAP Sandboxでパイプライン作成後、Cloud Data Fusionで実行させる流れですが如何でしたでしょうか。
ローカルでCDAP Sandboxを利用することで、パイプラインの検証ができるので
Cloud Data Fusionのコストを抑えることができそうですね。
最後までご覧頂きありがとうございます。