CDAPのパイプラインをローカルで作成し、Cloud Data Fusionで実行する

こんにちは。香田です。

今回はローカル環境でCDAP Sandboxにてパイプラインを作成後、
GCPのCloud Data Fusionにパイプラインをインポートして実行する流れを解説していきます。

はじめに

はじめにCDAPについて紹介しておくと、

CDAPはデータ分析で必要なETLおよびELTのパイプラインを効率的に構築できるオープンソースで提供されているデータ統合サービスです。

GCPのCloud Data FusionはCDAPを利用したフルマネージドのクラウドネイティブなエンタープライズデータ統合サービスになります。

CDAPではスケーラブルで高可用性な構成を提供する場合、Hadoopプラットフォームが必要ですが開発用途として提供されているCDAP Sandboxを利用すると単一のマシンで検証可能となります。

また、CDAPで作成したパイプラインはCloud Data Fusionと互換性があるので、パイプラインの開発環境としてCDAP Sandboxを利用することが可能です。

CDAP Sandboxをローカルでセットアップ

CDAP Sandbox docker imageをインストールします。

$ docker pull caskdata/cdap-sandbox:6.1.1

CDAP Sandboxを起動します。

$ docker run -d --name cdap-sandbox -p 11011:11011 -p 11015:11015 caskdata/cdap-sandbox:6.1.1

CDAP UIに接続します。

$ open http://localhost:11011

下記のような画面が表示されたらOKです

cdap1

GCP サービスアカウントキーの作成

CDAP SandboxにてGCPのリソースを参照できるようサービスアカウントキーを作成します。

GCP [IAMと管理]の[サービスアカウント]より[サービス アカウントの作成]を選択します。
[サービスアカウント名]を入力し作成をクリックします。

iam1

ここではロールに[BigQuery管理者]と[ストレージ管理者]を設定しています。
問題なければ[続行]をクリックします。

iam2

[キーを作成]より[JSON]を選択し[作成]をクリックします。
ダウンロードされたJSONファイルは以降の手順で使用します。

iam3

ダウンロードしたサービスアカウントキーをCDAP Sandboxへコピーします。

$ docker cp <service-account-key>.json cdap-sandbox:/opt/cdap/sandbox/

Wranglerで変換処理の設定

CDAP UIへアクセスし[Wrangler]をクリックします。

[Add Connection]より[Google Cloud Storage]を選択します

cdap3

[Connection name]、[Project ID]、[Service account key file location]を入力し[Test Connection]にて問題なければ[Add Connection]をクリックします。

cdap4

対象のGCSバケットよりファイルを選択します。

cdap7

ここでは[Wrangler]にて下記のような変換処理を設定しています。
問題なければ[Create Pipeline]から[Batch pipeline]をクリックします。

cdap8

パイプライン作成

[Studio]の画面が表示されるので、パイプラインを作成していきます。

cdap9

[Sink]の[BigQuery]を選択し、[Wrangler]から矢印を[BigQuery]へドラッグします。

cdap10

[BigQuery]の[Properties]をクリックし、[Project ID]、[Dataset]、[Table]、[Service Account File Path]を入力します。

cdap11

パイプラインの名前を入力後[Save]し、[Deploy]をクリックします。
[Run]にてパイプライン実行後、[Status]が[Succeeded]になっていれば成功です。

cdap12

パイプラインのエクスポート

パイプラインをGCPのCloud Data Fusionでも利用できるようにプロパティを一部修正します。

[Actions]より[Duplicate]しパイプラインをコピーします。

cdap13

[GCS]の[Properties]をクリックし[Service Account File Path]を[auto-detect]に変更しておきます。

cdap14

同じく[BigQuery]の[Properties]をクリックし[Service Account File Path]を[auto-detect]に変更しておきます。

cdap15

[Export]を実行すると、パイプライン設定のJSONファイルがダウンロード可能となります。

cdap16

Cloud Data Fusionを作成

GCP [Data Fusion]の[インスタンスを作成]より、[インスタンス名]を設定し[作成]をクリックします。

datafusion1

Data Fusionのパイプラインへインポート

Data FusionのUIにて[Pipeline]の[List]より[Import]をクリックします。
CDAP SandboxよりエクスポートしたJSONファイルを選択します。

datafusion2

インポートする際に下記の用にアップデート画面がでた場合、[Fix All]をクリックします。

datafusion3

[Deploy]をクリックします。

datafusion4

[Run]にてパイプライン実行後、[Status]が[Succeeded]になっていれば成功です。

datafusion5

さいごに

CDAP Sandboxでパイプライン作成後、Cloud Data Fusionで実行させる流れですが如何でしたでしょうか。

ローカルでCDAP Sandboxを利用することで、パイプラインの検証ができるので
Cloud Data Fusionのコストを抑えることができそうですね。

最後までご覧頂きありがとうございます。

SNSでもご購読できます。