Mauticのコンタクト ページヒットイベントをBigQueryへ読み込む

こんにちは、香田です。

今回はMauticのコンタクト ページヒットイベントをBigQueryへ読み込む方法について紹介していきます。

流れとしては、Mauticのコンタクト アクティビティイベントをREST APIを利用しPythonで取得し、最終的にBigQueryへデータを読み込む内容となります。

Mautic API 設定の有効化

はじめにMauticのAPI設定を有効化していきます。以前下記の記事に記載しましたので参考にしてみてください。

API 設定を有効化したあと、curlコマンドを実行しコンタクトのアクティビティイベント APIが正しく取得できれば成功です。

$ curl -G -s -X GET https://<Mautic サイトURL>/api/contacts/activity \
-d 'filters[includeEvents][]=page.hit' \
-d 'filters[dateFrom]=2020-10-01' \
-d 'filters[dateTo]=2020-10-25' \
-u "<ユーザー名>:<パスワード>"

Python 開発環境の設定

次にMautic イベントデータの取得はPythonを利用する為、開発環境を設定していきます。

venv仮想環境を初期化します。

$ python -m venv venv
$ source venv/bin/activate

取得したデータは事前にGoogle Cloud Storageに保存する為、別途パッケージをインストールします。

$ pip install google-cloud-storage

Mautic ページヒット イベントの取得

Mautic ページヒット イベント取得のPython サンプルコードは下記になります。

下記のコードをコピーしmain.pyという名前で保存します。

import urllib.request
import json
import base64
import os
from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def main():
    user = os.environ['USER']
    password = os.environ['PASSWORD']
    url = os.environ['URL']
    date_from = os.environ['DATE_FROM']
    date_to = os.environ['DATE_TO']
    bucket = os.environ['BUCKET']
    dest_folder = os.environ['DEST_FOLDER']
    api_endpoint = '%s/api/contacts/activity' % (url)
    params = {
        'filters[includeEvents][]': 'page.hit',
        'filters[dateFrom]': date_from,
        'filters[dateTo]': date_to
    }
    basic_user_and_pasword = base64.b64encode(
        '{}:{}'.format(user, password).encode('utf-8'))
    request_url = '%s?%s' % (
        api_endpoint, urllib.parse.urlencode(params))
    req = urllib.request.Request(request_url,
                                 headers={"Authorization": "Basic " +
                                          basic_user_and_pasword.decode(
                                              'utf-8')
                                          })
    res = urllib.request.urlopen(req)
    body = res.read()
    data = json.loads(body)
    max_page = data['maxPages']

    for page in range(1, int(max_page)+1):
        params.update(page=page)
        request_url = '%s?%s' % (
            api_endpoint, urllib.parse.urlencode(params))
        req = urllib.request.Request(request_url,
                                     headers={"Authorization": "Basic " +
                                              basic_user_and_pasword.decode(
                                                  'utf-8')
                                              })
        try:
            res = urllib.request.urlopen(req)
            body = res.read()
            data = json.loads(body)
            file_name = '%s_%s_events%d.json' % (date_from, date_to, page)
            for e in data['events']:
                with open(file_name, 'a') as file:
                    message = '%s\n' % (json.dumps(e, ensure_ascii=False))
                    file.write(message)
            dest_file = '%s/%s' % (dest_folder, file_name)
            upload_blob(bucket, file_name, dest_file)
            os.remove(file_name)

        except urllib.error.URLError as e:
            print(e.reason)


if __name__ == "__main__":
    main()

サンプルコードで利用する環境変数を設定していきます。各環境変数の内容は下記にになります。

環境変数名内容
USERMautic ユーザー名
PASSWORDMautic パスワード
URLMautic サイトURL
DATE_FROM取得対象の開始日
DATE_TO取得対象の終了日
BUCKETGCSバケット名
DEST_FOLDERGCSバケット保存先

下記のように環境変数を適宜セットします。

export USER='test@example.com'
export PASSWORD='test'
export URL='https://example.com'
export DATE_FROM='2020-10-01'
export DATE_TO='2020-10-25'
export BUCKET='example-bucket'
export DEST_FOLDER='mautic/contacts_activity'

サンプルコードを実行します。

$ python main.py

設定したバケットにデータが保存されていれば成功です。

BigQuery JSON スキーマファイルの作成

下記のBigQuery JSON スキーマファイル生成ツールを利用し、スキーマファイルを作成します。

bigquery_schema_generatorのインストール

$ pip install bigquery_schema_generator
$ source venv/bin/activate

GCSに保存されたファイルを元にツール実行すると、スキーマファイルが作成されます。

$ gsutil cat gs://<GCS バケット名>/mautic/contacts_activity/2020-10-01_2020-10-05_events1.json | generate-schema > schema.json

BigQueryへデータ読み込み

BigQuery データセットを作成します。GCSからBigQueryへデータ読み込みできるようバケットと同じロケーションに設定します。

$ bq --location=asia-northeast1 mk --dataset mautic

スキーマファイルを利用しBigQueryへデータを読み込みます。

注意点として、Mautic ページヒットイベントのJSONデータは内容によってキーが異なることがあります。

今回はignore_unknown_valuesオプションを指定し、作成したテーブル スキーマと一致しない余分なキーを持つ行を無視する用に設定しています。

$ bq load \
--source_format=NEWLINE_DELIMITED_JSON \
--ignore_unknown_values \
mautic.contacts_activity \
gs://<GCS バケット名>/mautic/contacts_activity/* \
schema.json

成功するとテーブルが作成され、データが読み込まれているはずです。

下記のようにクエリを実行してみて、結果が確認できれば成功です。

SELECT
  timestamp,
  details.hit.device,
  contactId,
  details.hit.query.page_url,
FROM
  `<project_id>.mautic.contacts_activity`
WHERE
  details.hit.query.page_url LIKE '%gclid%'
ORDER BY
  timestamp DESC;

さいごに

Mauticのコンタクト ページヒットイベントをBigQueryへ読み込む方法いかがでしたでしょうか。

Mauticではコンタクト アクティビティイベントの他にもREST APIが公開されている為、よかったら下記のドキュメントを参考にしてみてください。

最後までご覧いただきありがとうございます。

SNSでもご購読できます。