こんにちは、香田です。
今回はDataflowでPubSubのメッセージをGCSに書き込む方法を紹介していきます。
Python3の環境でApache Beam Python SDKを利用し、ストリーミング パイプラインを作成していきます。
Python 開発環境の設定
はじめにApache Beam SDK
をインストールし開発環境を設定していきます。
- venv仮想環境 初期化
$ python -m venv venv
$ source venv/bin/activate
- パッケージ インストール
$ pip install apache-beam[gcp]
PubSub、GCSの作成
次にストリーミング パイプラインで利用するPubSubとGCSを作成していきます。
- トピック作成
$ gcloud pubsub topics create sample-topic
- サブスクリプション作成
$ gcloud pubsub subscriptions create sample-sub --topic=sample-topic
- バケット作成
$ gsutil mb -l <location> gs://<bucket_name>
パイプラインを作成
パイプラインのサンプルコードは下記になります。下記のコードをコピーしpubsub_to_gcs.py
という名前で保存します。
import argparse
import datetime
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
class FormatMessage(beam.DoFn):
def process(self, message, publish_time=beam.DoFn.TimestampParam):
yield {
'data': message.data.decode('utf-8'),
'attributes': message.attributes,
'publish_time': datetime.datetime.utcfromtimestamp(
float(publish_time)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
}
class WriteToGCS(beam.DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, batch, window=beam.DoFn.WindowParam):
ts_format = "%Y%m%d%H%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
filename = "-".join([self.output_path, window_start, window_end])
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for element in batch:
f.write("{}\n".format(json.dumps(element)).encode("utf-8"))
def run(input_subscription, output_path, window_size=1.0, pipeline_args=None):
pipeline_options = PipelineOptions(
pipeline_args,
streaming=True,
save_main_session=True
)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(
subscription=input_subscription,
with_attributes=True
)
| "Window into Fixed Intervals" >> beam.WindowInto(
window.FixedWindows(int(window_size * 60))
)
| "Format Message" >> beam.ParDo(FormatMessage())
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
| "Write to GCS" >> beam.ParDo(WriteToGCS(output_path))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help="The Cloud Pub/Sub subscription to read from.\n"
'"projects/<PROJECT_NAME>/subscriptions/<SUBSCRIPTION_NAME>".',
)
parser.add_argument(
"--window_size",
type=float,
default=1.0,
help="Output file's window size in number of minutes.",
)
parser.add_argument(
"--output_path",
help="GCS Path of the output file including filename prefix.",
)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_subscription,
known_args.output_path,
known_args.window_size,
pipeline_args,
)
Direct Runnerでパイプラインを実行
パイプラインをローカルでテスト実行できるDirect Runner
を利用し、PubSubのメッセージがGCSバケットへ書き込まれるか確認していきます。
- パイプラインで利用する設定を環境変数に設定します。
export PROJECT_NAME=<GCP Project Name>
export BUCKET_NAME=<bucket_name>
export SUBSCRIPTION_NAME=sample-sub
export RUNNER=DirectRunner
- DirectRunnerでパイプラインを実行します。3分間隔でバケットにファイルが書き込まれる用に
window_size
を3に設定しています。
$ python pubsub_to_gcs.py \
--project=${PROJECT_NAME} \
--input_subscription=projects/$PROJECT_NAME/subscriptions/${SUBSCRIPTION_NAME} \
--output_path=gs://${BUCKET_NAME}/dataflow/outputs/data \
--runner=${RUNNER} \
--window_size=3 \
--temp_location=gs://${BUCKET_NAME}/dataflow/temp
- PubSubへ10分間メッセージを送信します。
$ for i in $(seq 1 600); do
gcloud pubsub topics publish sample-topic --message "No.$i test message" --attribute "service=iot, region=tokyo"
sleep 1
done
- gsutilコマンドで確認すると送信したメッセージが確認できるはずです。
$ gsutil cat gs://<bucket_name>/dataflow/outputs/data-*
Dataflow サービスでパイプラインを実行
次にパイプラインをDataflow上で実行します。
- Dataflowを利用するにように環境変数に設定します。
export RUNNER=DataflowRunner
- Dataflowでパイプラインを実行します。
$ python pubsub_to_gcs.py \
--project=${PROJECT_NAME} \
--input_subscription=projects/$PROJECT_NAME/subscriptions/${SUBSCRIPTION_NAME} \
--output_path=gs://${BUCKET_NAME}/dataflow/outputs/data \
--runner=${RUNNER} \
--window_size=3 \
--temp_location=gs://${BUCKET_NAME}/dataflow/temp
- しばらくすると下記のようにDataflowで実行されていることが確認できるはずです。
- PubSubへ10分間メッセージを送信します。
$ for i in $(seq 1 600); do
gcloud pubsub topics publish sample-topic --message "No.$i test message" --attribute "service=iot, region=tokyo"
sleep 1
done
パイプラインの停止
Dataflowで実行中のパイプラインは下記のように[ジョブグラフ]より停止可能です。
さいごに
DataflowでPubSubのメッセージをGCSに書き込む方法いかがでしたしょうか?
Python3でDataflow ストリーミング パイプラインを利用する際、本記事が参考になれば幸いでございます。
最後までご覧頂きありがとうございます。