DataflowでPubSubのメッセージをGCSに書き込む

こんにちは、香田です。

今回はDataflowでPubSubのメッセージをGCSに書き込む方法を紹介していきます。

Python3の環境でApache Beam Python SDKを利用し、ストリーミング パイプラインを作成していきます。

Python 開発環境の設定

はじめにApache Beam SDKをインストールし開発環境を設定していきます。

  • venv仮想環境 初期化
$ python -m venv venv
$ source venv/bin/activate
  • パッケージ インストール
$ pip install apache-beam[gcp]

PubSub、GCSの作成

次にストリーミング パイプラインで利用するPubSubとGCSを作成していきます。

  • トピック作成
$ gcloud pubsub topics create sample-topic
  • サブスクリプション作成
$ gcloud pubsub subscriptions create sample-sub --topic=sample-topic
  • バケット作成
$ gsutil mb -l <location> gs://<bucket_name>

パイプラインを作成

パイプラインのサンプルコードは下記になります。下記のコードをコピーしpubsub_to_gcs.pyという名前で保存します。

import argparse
import datetime
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window


class FormatMessage(beam.DoFn):
    def process(self, message, publish_time=beam.DoFn.TimestampParam):
        yield {
            'data': message.data.decode('utf-8'),
            'attributes': message.attributes,
            'publish_time': datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }


class WriteToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        ts_format = "%Y%m%d%H%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:
                f.write("{}\n".format(json.dumps(element)).encode("utf-8"))


def run(input_subscription, output_path, window_size=1.0, pipeline_args=None):
    pipeline_options = PipelineOptions(
        pipeline_args,
        streaming=True,
        save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(
                subscription=input_subscription,
                with_attributes=True
            )
            | "Window into Fixed Intervals" >> beam.WindowInto(
                window.FixedWindows(int(window_size * 60))
            )
            | "Format Message" >> beam.ParDo(FormatMessage())
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
            | "Write to GCS" >> beam.ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help="The Cloud Pub/Sub subscription to read from.\n"
        '"projects/<PROJECT_NAME>/subscriptions/<SUBSCRIPTION_NAME>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in number of minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="GCS Path of the output file including filename prefix.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_subscription,
        known_args.output_path,
        known_args.window_size,
        pipeline_args,
    )

Direct Runnerでパイプラインを実行

パイプラインをローカルでテスト実行できるDirect Runnerを利用し、PubSubのメッセージがGCSバケットへ書き込まれるか確認していきます。

  • パイプラインで利用する設定を環境変数に設定します。
export PROJECT_NAME=<GCP Project Name>
export BUCKET_NAME=<bucket_name>
export SUBSCRIPTION_NAME=sample-sub
export RUNNER=DirectRunner
  • DirectRunnerでパイプラインを実行します。3分間隔でバケットにファイルが書き込まれる用にwindow_sizeを3に設定しています。
$ python pubsub_to_gcs.py \
  --project=${PROJECT_NAME} \
  --input_subscription=projects/$PROJECT_NAME/subscriptions/${SUBSCRIPTION_NAME} \
  --output_path=gs://${BUCKET_NAME}/dataflow/outputs/data \
  --runner=${RUNNER} \
  --window_size=3 \
  --temp_location=gs://${BUCKET_NAME}/dataflow/temp
  • PubSubへ10分間メッセージを送信します。
$ for i in $(seq 1 600); do
    gcloud pubsub topics publish sample-topic --message "No.$i test message" --attribute "service=iot, region=tokyo"
    sleep 1
done
  • gsutilコマンドで確認すると送信したメッセージが確認できるはずです。
$ gsutil cat gs://<bucket_name>/dataflow/outputs/data-*

Dataflow サービスでパイプラインを実行

次にパイプラインをDataflow上で実行します。

  • Dataflowを利用するにように環境変数に設定します。
export RUNNER=DataflowRunner
  • Dataflowでパイプラインを実行します。
$ python pubsub_to_gcs.py \
  --project=${PROJECT_NAME} \
  --input_subscription=projects/$PROJECT_NAME/subscriptions/${SUBSCRIPTION_NAME} \
  --output_path=gs://${BUCKET_NAME}/dataflow/outputs/data \
  --runner=${RUNNER} \
  --window_size=3 \
  --temp_location=gs://${BUCKET_NAME}/dataflow/temp
  • しばらくすると下記のようにDataflowで実行されていることが確認できるはずです。
  • PubSubへ10分間メッセージを送信します。
$ for i in $(seq 1 600); do
    gcloud pubsub topics publish sample-topic --message "No.$i test message" --attribute "service=iot, region=tokyo"
    sleep 1
done

パイプラインの停止

Dataflowで実行中のパイプラインは下記のように[ジョブグラフ]より停止可能です。

さいごに

DataflowでPubSubのメッセージをGCSに書き込む方法いかがでしたしょうか?

Python3でDataflow ストリーミング パイプラインを利用する際、本記事が参考になれば幸いでございます。

最後までご覧頂きありがとうございます。

SNSでもご購読できます。