こんにちは、香田です。
今回はMauticのREST APIよりコンタクト情報を取得しBigQueryへ読み込む方法について紹介していきます。
流れとしてはMautic REST APIを利用し、Pythonでコンタクト情報を取得しGCSへ保存します。
GCS保存後、BigQueryへ読み込む流れとなります。
Python 開発環境の設定
Mautic コンタクト情報取得の為、Pythonの開発環境を設定していきます。
venv仮想環境を初期化します。
$ python -m venv venv
$ source venv/bin/activate
取得したデータをGoogle Cloud Storageに保存する為、別途パッケージをインストールします。
$ pip install google-cloud-storage
Mautic コンタクト情報の取得
Mautic コンタクト情報取得のPython サンプルコードは下記になります。
コンタクト情報はemailアドレスが登録されているものに限定して取得するようにしています。
下記のコードをコピーしmain.py
という名前で保存します。
import urllib.request
import json
import base64
import os
from google.cloud import storage
def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
def main():
user = os.environ['USER']
password = os.environ['PASSWORD']
url = os.environ['URL']
bucket = os.environ['BUCKET']
dest_folder = os.environ['DEST_FOLDER']
api_endpoint = '%s/api/contacts' % (url)
limit = 100
params = {
'where[0][col]': 'email',
'where[0][expr]': 'isNotNull',
'limit': limit,
'orderBy': 'id',
}
basic_user_and_pasword = base64.b64encode(
'{}:{}'.format(user, password).encode('utf-8'))
request_url = '%s?%s' % (
api_endpoint, urllib.parse.urlencode(params))
req = urllib.request.Request(request_url,
headers={"Authorization": "Basic " +
basic_user_and_pasword.decode(
'utf-8')
})
res = urllib.request.urlopen(req)
body = res.read()
data = json.loads(body)
count = int(data['total']) // limit
start = 0
for i in range(count + 1):
params.update(start=start)
request_url = '%s?%s' % (
api_endpoint, urllib.parse.urlencode(params))
req = urllib.request.Request(request_url,
headers={"Authorization": "Basic " +
basic_user_and_pasword.decode(
'utf-8')
})
res = urllib.request.urlopen(req)
body = res.read()
data = json.loads(body)
try:
res = urllib.request.urlopen(req)
body = res.read()
data = json.loads(body)
file_name = 'contacts-%d.json' % (i)
for k in data['contacts']:
v = data['contacts'][k]
with open(file_name, 'a') as file:
message = '%s\n' % (json.dumps(v, ensure_ascii=False))
file.write(message)
dest_file = '%s/%s' % (dest_folder, file_name)
upload_blob(bucket, file_name, dest_file)
os.remove(file_name)
except urllib.error.URLError as e:
print(e.reason)
start += limit
if __name__ == "__main__":
main()
サンプルコードで利用する環境変数を設定していきます。各環境変数の内容は下記にになります。
環境変数名 | 内容 |
---|---|
USER | Mautic ユーザー名 |
PASSWORD | Mautic パスワード |
URL | Mautic サイトURL |
BUCKET | GCSバケット名 |
DEST_FOLDER | GCSバケット保存先 |
下記のように環境変数を適宜セットします。
export USER='test@example.com'
export PASSWORD='test'
export URL='https://example.com'
export BUCKET='example-bucket'
export DEST_FOLDER='mautic/contacts'
サンプルコードを実行します。
$ python main.py
設定したバケットにデータが保存されていれば成功です。
BigQueryへデータ読み込み
BigQuery データセットを作成します。GCSからBigQueryへデータ読み込みできるようバケットと同じロケーションに設定します。
$ bq --location=asia-northeast1 mk --dataset mautic
スキーマの自動検出を有効化し、BigQueryへロードします。
$ bq load \
--autodetect \
--replace \
--source_format=NEWLINE_DELIMITED_JSON \
mautic.contacts \
gs://<GCS バケット名>/mautic/contacts/contacts-0.json
成功するとテーブルが作成され、データが読み込まれているはずです。
下記のようなクエリを実行することで、コンタクト情報が取得可能です。
SELECT
fields.all.id,
fields.all.lastname,
fields.all.firstname,
fields.all.email,
fields.all.phone,
fields.all.company,
fields.all.country,
fields.all.city,
fields.all.last_active,
fields.all.points,
FROM
`<project_id>.mautic.contacts`
さいごに
Mautic REST APIよりコンタクト情報を取得しBigQueryへ読み込む方法いかがでしたでしょうか。
最後までご覧いただきありがとうございます。