Prefectを使用しワークフローをECS Fargateで実行する

こんにちは、香田です。

今回はPrefectを使用しワークフローをECS Fargateで実行する方法について紹介していきます。

ワークフローの管理機能としてはPrefect Cloudを利用する為、事前に下記より無料アカウントを作成してみてください。

Prefectについて

Prefectについて簡単に紹介すると、PrefectはPythonでワークフローを記述することができるワークフロー管理ツールです。

Prefectでワークフローを作成する際、Pythonスクリプトとして下記のように関数をタスクとして定義し、それらを呼び出すフローを定義していきます。

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

フローの状態管理やスケジュール実行といった機能が、バックエンドサーバーとしてオープンソースのPrefect Core server、クラウドサービスのPrefect Cloudの2つで提供されています。

ワークフローはバックエンドサーバーよりPrefect Agentによってタスクが実行される流れとなります。

Prefectのインストール

はじめにPrefectをローカル開発環境にインストールします。

作業ディレクトリを作成します。

mkdir prefect-tutorial
cd prefect-tutorial

venv仮想環境を初期化します。

python -m venv venv
source venv/bin/activate

Prefectをインストールします。

pip install prefect[aws]==0.14.22

Prefect プロジェクトの作成

次にPrefect Cloudにプロジェクトを作成していきます。

Prefect CLIで操作できるようにPrefect CloudにてAPI KEYを発行します。

右上隅のユーザーメニューより[Account Settings]、[API Keys]より[CREATE AN API KEY]をクリックし、API KEYを作成しコピーします。

Prefect バックエンドを設定します。

prefect backend cloud

作成したAPI KEYを環境変数に設定します。

export PREFECT__CLOUD__AUTH_TOKEN=<API KEY>

Prefect Cloudにプロジェクトを作成します。

prefect create project tutorial

ECS 環境の作成

つぎにPrefect 実行環境用のECS クラスターの作成とECS タスクで利用するCloudWatch Logsを作成します。

ECS クラスターを作成します。

aws ecs create-cluster --cluster-name ecs-cluster

ECS タスク用のCloudWatch Logs ロググループを作成します。

aws logs create-log-group --log-group-name /ecs/prefect

IAM ロールの作成

次にECSで利用するIAM ロールとして、「ECS タスク実行 IAM ロール」と「タスク用のIAM ロール」を作成していきます。

ECS タスク実行 IAM ロールはECSタスクにてコンテナイメージ取得、CloudWatch Logsへログ保存等を許可する為に利用されるIAM Roleになります。

ECS タスク実行 IAM ロールは下記を参考にecsTaskExecutionRoleという名前で作成してください。

タスク用のIAM ロールはタスクのコンテナで使用するIAM ロールになります。

タスク用のIAM ロールは下記を参考にecsTaskRoleという名前で作成します。

タスク用のIAM ロールへ適用するポリシーは、下記を適用してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DeleteSecurityGroup",
                "ecs:CreateCluster",
                "ecs:DescribeTasks",
                "ecs:ListAccountSettings",
                "ecs:RegisterTaskDefinition",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:ListClusters",
                "ecs:DescribeClusters",
                "ecs:DeleteCluster",
                "ecs:ListTaskDefinitions",
                "ecs:DescribeTaskDefinition",
                "ecs:DeregisterTaskDefinition",
                "iam:AttachRolePolicy",
                "iam:CreateRole",
                "iam:TagRole",
                "iam:PassRole",
                "iam:DeleteRole",
                "iam:ListRoles",
                "iam:ListRoleTags",
                "iam:ListAttachedRolePolicies",
                "iam:DetachRolePolicy",
                "logs:DescribeLogGroups",
                "logs:GetLogEvents",
                "logs:CreateLogGroup",
                "logs:PutRetentionPolicy",
                "s3:Get*",
                "s3:List*"
            ],
            "Resource": "*"
        }
    ]
}

Prefect ECS Agent タスク定義の登録

次にPrefect ECS Agentで利用するタスク定義を作成します。

下記をコピーしprefect-agent-task.jsonという名前で保存します。

タスク定義で指定されているenvironmentPREFECT__CLOUD__AGENT__AUTH_TOKENの箇所は作成したAPI KEYを設定してください。

{
    "family": "prefect-agent",
    "requiresCompatibilities": [
        "FARGATE"
    ],
    "networkMode": "awsvpc",
    "cpu": "512",
    "memory": "1024",
    "taskRoleArn": "ecsTaskRole",
    "executionRoleArn": "ecsTaskExecutionRole",
    "containerDefinitions": [
        {
            "name": "prefect-agent",
            "image": "prefecthq/prefect:0.14.22-python3.8",
            "essential": true,
            "command": [
                "prefect",
                "agent",
                "ecs",
                "start"
            ],
            "environment": [
                {
                    "name": "PREFECT__CLOUD__AGENT__AUTH_TOKEN",
                    "value": "<API KEY>"
                },
                {
                    "name": "PREFECT__CLOUD__AGENT__LABELS",
                    "value": "['workflow']"
                },
                {
                    "name": "PREFECT__CLOUD__AGENT__LEVEL",
                    "value": "INFO"
                },
                {
                    "name": "PREFECT__CLOUD__API",
                    "value": "https://api.prefect.io"
                }
            ],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": "/ecs/prefect",
                    "awslogs-region": "ap-northeast-1",
                    "awslogs-stream-prefix": "agent",
                    "awslogs-create-group": "true"
                }
            }
        }
    ]
}

Prefect ECS Agentのタスク定義を登録します。

aws ecs register-task-definition \
--cli-input-json file://$PWD/prefect-agent-task.json

Prefect ECS Agent ECS サービスの作成

次にPrefect ECS AgentのECS サービスをFargateで起動するように作成します。

下記をコピーしprefect-agent-service.jsonという名前で保存します。

awsvpcConfigurationsubnetssecurigyGroupsは適宜設定してください。

デフォルトで作成されているVPCのサブネットとセキュリティグループを利用しても問題なく実行できるはずです。

{
    "cluster": "ecs-cluster",
    "serviceName": "prefect-agent",
    "taskDefinition": "prefect-agent",
    "launchType": "FARGATE",
    "schedulingStrategy": "REPLICA",
    "platformVersion": "LATEST",
    "networkConfiguration": {
        "awsvpcConfiguration": {
            "assignPublicIp": "ENABLED",
            "securityGroups": [
                "sg-xxx"
            ],
            "subnets": [
                "subnet-xxx",
                "subnet-xxx",
                "subnet-xxx"
            ]
        }
    },
    "desiredCount": 1,
    "enableExecuteCommand": true
}

Prefect ECS Agent ECS サービスを作成します。

aws ecs create-service \
--cli-input-json file://$PWD/prefect-agent-service.json

Prefect ECS Agentの起動は下記のようにCloudwatch Logsより確認可能です。

aws logs tail --follow /ecs/prefect

Prefect Flow Dockerfileの作成

Prefect FlowをECSで実行させるためにDockerfileを作成します。

FROM prefecthq/prefect:0.14.22-python3.8
RUN pip install prefect[aws]

Prefect Flow ECRへイメージ登録

作成したイメージをビルドしECRへ登録していきます。

ECRへログインします。

export ECR_URI_BASE=$(aws sts get-caller-identity --query Account --output text).dkr.ecr.ap-northeast-1.amazonaws.com
aws ecr get-login-password | docker login --username AWS --password-stdin $ECR_URI_BASE

ECR リポジトリを作成します。

aws ecr create-repository --repository-name flow-aws

イメージをビルドし、ECR リポジトリへ登録します。

docker build -t flow-aws:latest .
docker tag flow-aws:latest $ECR_URI_BASE/flow-aws:latest
docker push $ECR_URI_BASE/flow-aws:latest

Prefect Flow タスク定義の登録

Prefect Flowで利用するタスク定義を作成します。

下記をコピーしprefect-flow-aws-task.jsonという名前で保存します。

タスク定義で指定されているimageAWS アカウントIDの箇所は適宜修正してください。

{
    "family": "flow-aws",
    "requiresCompatibilities": [
        "FARGATE"
    ],
    "networkMode": "awsvpc",
    "cpu": "256",
    "memory": "512",
    "taskRoleArn": "ecsTaskRole",
    "executionRoleArn": "ecsTaskExecutionRole",
    "containerDefinitions": [
        {
            "name": "flow",
            "image": "<AWS アカウトID>.dkr.ecr.ap-northeast-1.amazonaws.com/flow-aws:latest",
            "essential": true,
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": "/ecs/prefect",
                    "awslogs-region": "ap-northeast-1",
                    "awslogs-stream-prefix": "flow",
                    "awslogs-create-group": "true"
                }
            }
        }
    ]
}

Prefect Flowのタスク定義を登録します。

aws ecs register-task-definition \
--cli-input-json file://$PWD/prefect-flow-aws-task.json

Prefect Flowの作成

作成したECS タスクにて実行されるPrefect Flowを作成していきます。

下記をコピーしecs-fargate-flow.pyという名前で保存します。

下記のsubnetssecurityGroupss3 bucket nameは適宜設定してください。

また、下記のFlowで利用しているECSRunの詳細については、こちらを参照してみてください。

import prefect
from prefect import task, Flow
from prefect.run_configs import ECSRun
from prefect.storage import S3


RUN_CONFIG = ECSRun(
    run_task_kwargs={
        "cluster": "ecs-cluster",
        "launchType": "FARGATE",
        "networkConfiguration":
            {'awsvpcConfiguration': {
                'assignPublicIp': 'ENABLED',
                'subnets': [
                    'subnet-xxx', 'subnet-xxx', 'subnet-xxx'
                ],
                'securityGroups': ['sg-xxx']
            }},
    },
    task_definition_arn="flow-aws",
    labels=["workflow"]
)


@task
def say_hello():
    logger = prefect.context.get("logger")
    logger.info("Hello, ECS Fargate")


with Flow("ecs-fargate-flow",
          run_config=RUN_CONFIG,
          storage=S3(bucket="<s3 bucket name>")) as flow:
    say_hello()

flow.register(project_name="tutorial", add_default_labels=False)

Prefect Flowの実行

作成したFlowをPrefect Cloudへ登録します。

python ecs-fargate-flow.py

成功するとPrefect Cloudの[FLOWS]へ下記のように登録されているはずです。

Flowを実行します。

prefect run flow --name "ecs-fargate-flow" --project tutorial

Prefect Flowの実行結果はCloudWatch Logsより確認可能です。

aws logs tail --follow /ecs/prefect

ECS クラスターのタスクをみると、起動タイプとしてFARGATEで起動していることが確認できます。

Prefect CloudよりFlowの概要や実行したタスクのログも確認可能です。

  • FLOW OVERVIEW
  • TASK RUN LOGS

さいごに

Prefectを使用しワークフローをECS Fargateで実行する方法いかがでしたしょうか?

Prefect AgentやPrefect Flowの実行環境は、Kubernetesも同様にサポートされている為、こちらも気になるところですね。

PrefectをECS Fargateで利用する際、本記事が参考になれば幸いでございます。

最後までご覧いただきありがとうございます。

SNSでもご購読できます。