こんにちは、香田です。
今回はPrefectを使用しワークフローをECS Fargateで実行する方法について紹介していきます。
ワークフローの管理機能としてはPrefect Cloudを利用する為、事前に下記より無料アカウントを作成してみてください。
Prefectについて
Prefectについて簡単に紹介すると、PrefectはPythonでワークフローを記述することができるワークフロー管理ツールです。
Prefectでワークフローを作成する際、Pythonスクリプトとして下記のように関数をタスクとして定義し、それらを呼び出すフローを定義していきます。
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def say_hello(name):
print("Hello, {}!".format(name))
with Flow("My First Flow") as flow:
name = Parameter('name')
say_hello(name)
flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"
フローの状態管理やスケジュール実行といった機能が、バックエンドサーバーとしてオープンソースのPrefect Core server、クラウドサービスのPrefect Cloudの2つで提供されています。
ワークフローはバックエンドサーバーよりPrefect Agentによってタスクが実行される流れとなります。
Prefectのインストール
はじめにPrefectをローカル開発環境にインストールします。
作業ディレクトリを作成します。
mkdir prefect-tutorial
cd prefect-tutorial
venv仮想環境を初期化します。
python -m venv venv
source venv/bin/activate
Prefectをインストールします。
pip install prefect[aws]==0.14.22
Prefect プロジェクトの作成
次にPrefect Cloudにプロジェクトを作成していきます。
Prefect CLIで操作できるようにPrefect CloudにてAPI KEYを発行します。
右上隅のユーザーメニューより[Account Settings]、[API Keys]より[CREATE AN API KEY]をクリックし、API KEYを作成しコピーします。
Prefect バックエンドを設定します。
prefect backend cloud
作成したAPI KEYを環境変数に設定します。
export PREFECT__CLOUD__AUTH_TOKEN=<API KEY>
Prefect Cloudにプロジェクトを作成します。
prefect create project tutorial
ECS 環境の作成
つぎにPrefect 実行環境用のECS クラスターの作成とECS タスクで利用するCloudWatch Logsを作成します。
ECS クラスターを作成します。
aws ecs create-cluster --cluster-name ecs-cluster
ECS タスク用のCloudWatch Logs ロググループを作成します。
aws logs create-log-group --log-group-name /ecs/prefect
IAM ロールの作成
次にECSで利用するIAM ロールとして、「ECS タスク実行 IAM ロール」と「タスク用のIAM ロール」を作成していきます。
ECS タスク実行 IAM ロールはECSタスクにてコンテナイメージ取得、CloudWatch Logsへログ保存等を許可する為に利用されるIAM Roleになります。
ECS タスク実行 IAM ロールは下記を参考にecsTaskExecutionRole
という名前で作成してください。
タスク用のIAM ロールはタスクのコンテナで使用するIAM ロールになります。
タスク用のIAM ロールは下記を参考にecsTaskRole
という名前で作成します。
タスク用のIAM ロールへ適用するポリシーは、下記を適用してください。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreateSecurityGroup",
"ec2:CreateTags",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DeleteSecurityGroup",
"ecs:CreateCluster",
"ecs:DescribeTasks",
"ecs:ListAccountSettings",
"ecs:RegisterTaskDefinition",
"ecs:RunTask",
"ecs:StopTask",
"ecs:ListClusters",
"ecs:DescribeClusters",
"ecs:DeleteCluster",
"ecs:ListTaskDefinitions",
"ecs:DescribeTaskDefinition",
"ecs:DeregisterTaskDefinition",
"iam:AttachRolePolicy",
"iam:CreateRole",
"iam:TagRole",
"iam:PassRole",
"iam:DeleteRole",
"iam:ListRoles",
"iam:ListRoleTags",
"iam:ListAttachedRolePolicies",
"iam:DetachRolePolicy",
"logs:DescribeLogGroups",
"logs:GetLogEvents",
"logs:CreateLogGroup",
"logs:PutRetentionPolicy",
"s3:Get*",
"s3:List*"
],
"Resource": "*"
}
]
}
Prefect ECS Agent タスク定義の登録
次にPrefect ECS Agentで利用するタスク定義を作成します。
下記をコピーしprefect-agent-task.json
という名前で保存します。
タスク定義で指定されているenvironment
のPREFECT__CLOUD__AGENT__AUTH_TOKEN
の箇所は作成したAPI KEYを設定してください。
{
"family": "prefect-agent",
"requiresCompatibilities": [
"FARGATE"
],
"networkMode": "awsvpc",
"cpu": "512",
"memory": "1024",
"taskRoleArn": "ecsTaskRole",
"executionRoleArn": "ecsTaskExecutionRole",
"containerDefinitions": [
{
"name": "prefect-agent",
"image": "prefecthq/prefect:0.14.22-python3.8",
"essential": true,
"command": [
"prefect",
"agent",
"ecs",
"start"
],
"environment": [
{
"name": "PREFECT__CLOUD__AGENT__AUTH_TOKEN",
"value": "<API KEY>"
},
{
"name": "PREFECT__CLOUD__AGENT__LABELS",
"value": "['workflow']"
},
{
"name": "PREFECT__CLOUD__AGENT__LEVEL",
"value": "INFO"
},
{
"name": "PREFECT__CLOUD__API",
"value": "https://api.prefect.io"
}
],
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/prefect",
"awslogs-region": "ap-northeast-1",
"awslogs-stream-prefix": "agent",
"awslogs-create-group": "true"
}
}
}
]
}
Prefect ECS Agentのタスク定義を登録します。
aws ecs register-task-definition \
--cli-input-json file://$PWD/prefect-agent-task.json
Prefect ECS Agent ECS サービスの作成
次にPrefect ECS AgentのECS サービスをFargateで起動するように作成します。
下記をコピーしprefect-agent-service.json
という名前で保存します。
awsvpcConfiguration
のsubnets
とsecurigyGroups
は適宜設定してください。
デフォルトで作成されているVPCのサブネットとセキュリティグループを利用しても問題なく実行できるはずです。
{
"cluster": "ecs-cluster",
"serviceName": "prefect-agent",
"taskDefinition": "prefect-agent",
"launchType": "FARGATE",
"schedulingStrategy": "REPLICA",
"platformVersion": "LATEST",
"networkConfiguration": {
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"securityGroups": [
"sg-xxx"
],
"subnets": [
"subnet-xxx",
"subnet-xxx",
"subnet-xxx"
]
}
},
"desiredCount": 1,
"enableExecuteCommand": true
}
Prefect ECS Agent ECS サービスを作成します。
aws ecs create-service \
--cli-input-json file://$PWD/prefect-agent-service.json
Prefect ECS Agentの起動は下記のようにCloudwatch Logsより確認可能です。
aws logs tail --follow /ecs/prefect
Prefect Flow Dockerfileの作成
Prefect FlowをECSで実行させるためにDockerfile
を作成します。
FROM prefecthq/prefect:0.14.22-python3.8
RUN pip install prefect[aws]
Prefect Flow ECRへイメージ登録
作成したイメージをビルドしECRへ登録していきます。
ECRへログインします。
export ECR_URI_BASE=$(aws sts get-caller-identity --query Account --output text).dkr.ecr.ap-northeast-1.amazonaws.com
aws ecr get-login-password | docker login --username AWS --password-stdin $ECR_URI_BASE
ECR リポジトリを作成します。
aws ecr create-repository --repository-name flow-aws
イメージをビルドし、ECR リポジトリへ登録します。
docker build -t flow-aws:latest .
docker tag flow-aws:latest $ECR_URI_BASE/flow-aws:latest
docker push $ECR_URI_BASE/flow-aws:latest
Prefect Flow タスク定義の登録
Prefect Flowで利用するタスク定義を作成します。
下記をコピーしprefect-flow-aws-task.json
という名前で保存します。
タスク定義で指定されているimage
のAWS アカウントID
の箇所は適宜修正してください。
{
"family": "flow-aws",
"requiresCompatibilities": [
"FARGATE"
],
"networkMode": "awsvpc",
"cpu": "256",
"memory": "512",
"taskRoleArn": "ecsTaskRole",
"executionRoleArn": "ecsTaskExecutionRole",
"containerDefinitions": [
{
"name": "flow",
"image": "<AWS アカウトID>.dkr.ecr.ap-northeast-1.amazonaws.com/flow-aws:latest",
"essential": true,
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": "/ecs/prefect",
"awslogs-region": "ap-northeast-1",
"awslogs-stream-prefix": "flow",
"awslogs-create-group": "true"
}
}
}
]
}
Prefect Flowのタスク定義を登録します。
aws ecs register-task-definition \
--cli-input-json file://$PWD/prefect-flow-aws-task.json
Prefect Flowの作成
作成したECS タスクにて実行されるPrefect Flowを作成していきます。
下記をコピーしecs-fargate-flow.py
という名前で保存します。
下記のsubnets
、securityGroups
、s3 bucket name
は適宜設定してください。
また、下記のFlowで利用しているECSRun
の詳細については、こちらを参照してみてください。
import prefect
from prefect import task, Flow
from prefect.run_configs import ECSRun
from prefect.storage import S3
RUN_CONFIG = ECSRun(
run_task_kwargs={
"cluster": "ecs-cluster",
"launchType": "FARGATE",
"networkConfiguration":
{'awsvpcConfiguration': {
'assignPublicIp': 'ENABLED',
'subnets': [
'subnet-xxx', 'subnet-xxx', 'subnet-xxx'
],
'securityGroups': ['sg-xxx']
}},
},
task_definition_arn="flow-aws",
labels=["workflow"]
)
@task
def say_hello():
logger = prefect.context.get("logger")
logger.info("Hello, ECS Fargate")
with Flow("ecs-fargate-flow",
run_config=RUN_CONFIG,
storage=S3(bucket="<s3 bucket name>")) as flow:
say_hello()
flow.register(project_name="tutorial", add_default_labels=False)
Prefect Flowの実行
作成したFlowをPrefect Cloudへ登録します。
python ecs-fargate-flow.py
成功するとPrefect Cloudの[FLOWS]へ下記のように登録されているはずです。
Flowを実行します。
prefect run flow --name "ecs-fargate-flow" --project tutorial
Prefect Flowの実行結果はCloudWatch Logsより確認可能です。
aws logs tail --follow /ecs/prefect
ECS クラスターのタスクをみると、起動タイプとしてFARGATEで起動していることが確認できます。
Prefect CloudよりFlowの概要や実行したタスクのログも確認可能です。
- FLOW OVERVIEW
- TASK RUN LOGS
さいごに
Prefectを使用しワークフローをECS Fargateで実行する方法いかがでしたしょうか?
Prefect AgentやPrefect Flowの実行環境は、Kubernetesも同様にサポートされている為、こちらも気になるところですね。
PrefectをECS Fargateで利用する際、本記事が参考になれば幸いでございます。
最後までご覧いただきありがとうございます。