data:image/s3,"s3://crabby-images/9f1df/9f1df519987e53f9fd47bae779e6ee3a1a3d03b3" alt=""
こんにちは、香田です。
GCPのサービスを利用して、リアルタイムにデータ変換処理のパイプラインを実現する場合、Cloud Dataflowを利用することが多いのではないでしょうか。
例えば、DataflowでPubSubからメッセージを読み込み、BigQueryにロードするといった処理がテンプレートとしても用意されているので、要件にマッチしていれば簡単に導入できるので非常に有用ではないでしょうか。
データパイプラインのサービスに関連して、GCPではGUIで簡単に作成できるCloud Data Fusionも提供されています。
Cloud Data Fusionではパイプラインとしてバッチ パイプラインとリアルタイム パイプラインを提供しています。
今回はリアルタイム パイプラインの作成を紹介していきます。
はじめに
Cloud Data Fusionでリアルタイム パイプラインを利用するには、Enterprise エディションを利用する必要があります。
Enterprise エディションは1時間($4.20)程費用が発生する為、パイプラインを開発する時はローカル環境でCDAP Sandboxを利用したほうがコストを節約できるのでオススメです。
CDAP Sandboxについては下記の記事を参考にしてみてください。
CDAPのパイプラインをローカルで作成し、Cloud Data Fusionで実行する
作業の流れとしては、パイプラインの開発はCDAP Sandboxで開発&検証し、最後にCloud Data Fusionにデプロイする流れとなります。
今回、PubSubから受信するデータは下記のようなメッセージを想定して、パイプラインの変換処理を進めています。
$ gcloud pubsub subscriptions pull --auto-ack my-sub
┌───────────────────┬──────────────────┬──────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │
├───────────────────┼──────────────────┼──────────────┤
│ No.1 test message │ 1125074670101625 │ region=tokyo │
└───────────────────┴──────────────────┴──────────────┘
Studioでリアルタイムのパイプラインを選択
CDAP UIへアクセスし[Integrate]の[Studio]をクリックします。
data:image/s3,"s3://crabby-images/d5169/d5169a8f33f5168ed2a16b66ec79ed0d40f818dd" alt="cdap1"
[Data Pipeline – Realtime]に変更します。
data:image/s3,"s3://crabby-images/3204c/3204cb8c158c88c66338924bdbce9ec12746703b" alt="cdap2"
Source PubSub 設定
[Source]より[PubSub]を選択し[Properties]をクリックします。
data:image/s3,"s3://crabby-images/53c84/53c843cf3e4fb6bf05a074e4cb0a5e31e6bf2dc5" alt="cdap3"
[Reference Name]、[Project ID]、[Subscription]、[Topic]、[Service Account File Path]
を入力します。
data:image/s3,"s3://crabby-images/55c60/55c608c0fa893a319dc0c006db2463ccb6afd512" alt="cdap4"
Transform Projection 設定
PubSubのmessageはbyte型になっている為、TransformのProjectionを利用し、string型に変換します。
[Transform]より[Projection]を選択し[Properties]をクリックます。
data:image/s3,"s3://crabby-images/48657/486570c47b568e936a65207761e3dd47d7cd3a75" alt="cdap5"
[Projection Configuration]の[Convert]に[message]と入力し、[string]を選択し[Validate]よりチェックします。
data:image/s3,"s3://crabby-images/e76b5/e76b5a5d0a397bf7082dfa9fe124fb49b452a602" alt="cdap6"
Transform JSONFormatter 設定
PubSubのattributes Fieldはmap型になっている為、TransformのJSONFormatterを利用し、レコードを一度JSONオブジェクトにフォーマットします。
[Transform]より[JSONFormatter]を選択し[Properties]をクリックます。
data:image/s3,"s3://crabby-images/7c539/7c539164a671918980512c465ffd07b36d80d8ff" alt="cdap7"
[Output Schema]は単一フィールドのみとなる為、[Name]に[data]と入力し、[Type]は[string]に変更し[Validate]よりチェックします。
data:image/s3,"s3://crabby-images/3b66b/3b66bd44a9d8109f33acf947c24232b64e9175b2" alt="cdap8"
Transform JSONParser 設定
Transform JSONParserを利用し、JSONオブジェクトをレコードにパースしていきます。
JSONFormatterで処理する前のレコードと異なる点は、JSONParserによってattributesのKeyを取り出しfieldに変換しているところです。
[Transform]より[JSONParser]を選択し[Properties]をクリックます。
data:image/s3,"s3://crabby-images/5d846/5d8467a2707b5732bc21c98dc2e6178916a69dd2" alt="cdap9"
[JSON Parser]の[Input Field]に[data]と入力します。
[Json Path Mappings]、[OutPut Schema]は下記のように設定し[Validate]よりチェックします。
data:image/s3,"s3://crabby-images/4eaeb/4eaeb90fa435143ffce8e37fd3c5f02b3fc71862" alt="cdap10"
Sink BigQuery 設定
[Sink]より[BigQuery]を選択し[Properties]をクリックします。
data:image/s3,"s3://crabby-images/f857b/f857be6eb24c62eaf68b232ac80d47c4daf5f41a" alt="cdap11"
[Reference Name]、[Project ID]、[Dataset]、[Table]、[Temporary Bucket Name]、[Service Account File Path]を入力します。
data:image/s3,"s3://crabby-images/0f1ce/0f1cebf37847ea98b42d27cd8021349fa27376a4" alt="cdap12"
CDAP Sandboxでリアルタイムパイプラインの実行
パイプラインの名前を入力し、[Save]し[Deploy]をクリックします。
data:image/s3,"s3://crabby-images/fb5ee/fb5eea9ca646e3f400e6326032d04c8b5b557db2" alt="cdap13"
[Run]をクリックし、[Status]が[Running]になっていることを確認します。
data:image/s3,"s3://crabby-images/549e0/549e08d8c66843e611e6f54c7e09d95d77207a29" alt="cdap14"
PubSubへテストメッセージの送信
pubsubへテストメッセージを送信します。
for i in `seq 1 50`
do
gcloud pubsub topics publish my-topic --message "No.$i test message" --attribute "region=tokyo"
sleep 1
done
Sinkで設定したBigQueryのテーブルにクエリし、下記のような結果が確認できれば成功です。
data:image/s3,"s3://crabby-images/af04d/af04de63ef48c89c05846ce77bce7ab86baeb148" alt="bq1"
パイプライン設定のエクスポート
[Actions]より[Export]してパイプライン設定をエクスポートします。
data:image/s3,"s3://crabby-images/139d6/139d6f564f6f29fb654b1600102dfb915dc8a5a6" alt="cdap15"
Cloud Data Fusionにインポート
Enterprise エディションでCloud Data Fusionを起動し、パイプライン設定をインポートします。
Source PubSubとSink BigQueryの[Properties]をクリックし、[Project ID]、[Service Account File Path]を[auto-detect]に変更しておきます。
data:image/s3,"s3://crabby-images/82044/820441361caeedd0c42d2ce36dbcf987fd155c92" alt="cdf1"
Cloud Data Fusionでリアルタイムパイプラインの実行
[Deploy]後に[Run]をクリックし[Status]が[Running]になっていることを確認します。
data:image/s3,"s3://crabby-images/2e926/2e926c4c02dcbd7f5c92a7b56550bd6d9ab3c882" alt="cdf2"
下記の用にDataproc ClusterがMaster 1台、Worker 2台起動していることが確認できます。
data:image/s3,"s3://crabby-images/3c428/3c4289bb13d9d19eb422453363c61c52114e7017" alt="dataproc1"
CDAP Sandoxで検証した時と同様に、PubSubへテストメッセージを送信しBigQueryで同じような結果が確認できれば成功です。
さいごに
Cloud Data Fusionでリアルタイムのパイプラインを作成し実行してみましたが、如何でしたでしょうか。
変換処理も含めたデータパイプラインがGUIでノンコーティングで組み立てられるのは便利でよいですね。
補足ですが、リアルタイム パイプラインではパイプラインを削除することで、Dataproc Clusterが削除されます。
Cloud Data Fusionインスタンスを先に削除してしまうと、Dataproc Clusterは残ったままになるので注意が必要ですね。
最後までご覧頂きありがとうございます!