Mautic REST APIよりコンタクト情報を取得しBigQueryへ読み込む

こんにちは、香田です。

今回はMauticのREST APIよりコンタクト情報を取得しBigQueryへ読み込む方法について紹介していきます。

流れとしてはMautic REST APIを利用し、Pythonでコンタクト情報を取得しGCSへ保存します。

GCS保存後、BigQueryへ読み込む流れとなります。

Python 開発環境の設定

Mautic コンタクト情報取得の為、Pythonの開発環境を設定していきます。

venv仮想環境を初期化します。

$ python -m venv venv
$ source venv/bin/activate

取得したデータをGoogle Cloud Storageに保存する為、別途パッケージをインストールします。

$ pip install google-cloud-storage

Mautic コンタクト情報の取得

Mautic コンタクト情報取得のPython サンプルコードは下記になります。

コンタクト情報はemailアドレスが登録されているものに限定して取得するようにしています。

下記のコードをコピーしmain.pyという名前で保存します。

import urllib.request
import json
import base64
import os
from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)


def main():
    user = os.environ['USER']
    password = os.environ['PASSWORD']
    url = os.environ['URL']
    bucket = os.environ['BUCKET']
    dest_folder = os.environ['DEST_FOLDER']
    api_endpoint = '%s/api/contacts' % (url)
    limit = 100

    params = {
        'where[0][col]': 'email',
        'where[0][expr]': 'isNotNull',
        'limit': limit,
        'orderBy': 'id',
    }
    basic_user_and_pasword = base64.b64encode(
        '{}:{}'.format(user, password).encode('utf-8'))
    request_url = '%s?%s' % (
        api_endpoint, urllib.parse.urlencode(params))
    req = urllib.request.Request(request_url,
                                 headers={"Authorization": "Basic " +
                                          basic_user_and_pasword.decode(
                                              'utf-8')
                                          })
    res = urllib.request.urlopen(req)
    body = res.read()
    data = json.loads(body)
    count = int(data['total']) // limit
    start = 0

    for i in range(count + 1):
        params.update(start=start)
        request_url = '%s?%s' % (
            api_endpoint, urllib.parse.urlencode(params))
        req = urllib.request.Request(request_url,
                                     headers={"Authorization": "Basic " +
                                              basic_user_and_pasword.decode(
                                                  'utf-8')
                                              })
        res = urllib.request.urlopen(req)
        body = res.read()
        data = json.loads(body)
        try:
            res = urllib.request.urlopen(req)
            body = res.read()
            data = json.loads(body)
            file_name = 'contacts-%d.json' % (i)
            for k in data['contacts']:
                v = data['contacts'][k]
                with open(file_name, 'a') as file:
                    message = '%s\n' % (json.dumps(v, ensure_ascii=False))
                    file.write(message)
            dest_file = '%s/%s' % (dest_folder,  file_name)
            upload_blob(bucket, file_name, dest_file)
            os.remove(file_name)

        except urllib.error.URLError as e:
            print(e.reason)

        start += limit


if __name__ == "__main__":
    main()

サンプルコードで利用する環境変数を設定していきます。各環境変数の内容は下記にになります。

環境変数名内容
USERMautic ユーザー名
PASSWORDMautic パスワード
URLMautic サイトURL
BUCKETGCSバケット名
DEST_FOLDERGCSバケット保存先

下記のように環境変数を適宜セットします。

export USER='test@example.com'
export PASSWORD='test'
export URL='https://example.com'
export BUCKET='example-bucket'
export DEST_FOLDER='mautic/contacts'

サンプルコードを実行します。

$ python main.py

設定したバケットにデータが保存されていれば成功です。

BigQueryへデータ読み込み

BigQuery データセットを作成します。GCSからBigQueryへデータ読み込みできるようバケットと同じロケーションに設定します。

$ bq --location=asia-northeast1 mk --dataset mautic

スキーマの自動検出を有効化し、BigQueryへロードします。

$ bq load \
--autodetect \
--replace \
--source_format=NEWLINE_DELIMITED_JSON \
mautic.contacts \
gs://<GCS バケット名>/mautic/contacts/contacts-0.json

成功するとテーブルが作成され、データが読み込まれているはずです。

下記のようなクエリを実行することで、コンタクト情報が取得可能です。

SELECT
  fields.all.id,
  fields.all.lastname,
  fields.all.firstname,
  fields.all.email,
  fields.all.phone,
  fields.all.company,
  fields.all.country,
  fields.all.city,
  fields.all.last_active,
  fields.all.points,
FROM
  `<project_id>.mautic.contacts`

さいごに

Mautic REST APIよりコンタクト情報を取得しBigQueryへ読み込む方法いかがでしたでしょうか。

最後までご覧いただきありがとうございます。

SNSでもご購読できます。