Eddie Atkinson
08/19/2021, 2:32 AMfrom prefect.run_configs import ECSRun
from prefect import task, Flow
from prefect.storage import S3
import prefect
TASK_ARN = "an arn"
STORAGE = S3(bucket="my-cool-bucket")
LABELS = ["a label"]
RUN_CONFIG = ECSRun(
labels=LABELS,
task_role_arn=TASK_ARN,
image="prefecthq/prefect:latest-python3.8",
memory=512,
cpu=256,
)
@task(log_stdout=True)
def bye_world():
print("bye world")
@task(log_stdout=True)
def hello_world():
print("Hello, World!")
with Flow("test_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
hello_world()
bye_world()
flow.register(project_name="Tesla")
Eddie Atkinson
08/19/2021, 2:32 AMEddie Atkinson
08/19/2021, 4:53 AMtask_definition_arn
in the call to ECSRun
. I’m using the serverless framework and here is the config for those that are interested:
tasks:
prefect-task:
name: ${self:custom.ecsTaskName}-prefect-task
image: 'prefecthq/prefect:latest-python3.8'
desired: 0
cpu: 256 # 0.25vcpu
memory: 512 # 512mb
override:
role: ${self:custom.ecsBaseTaskRoleName}
container:
Name: flow
# container must be called 'flow' for prefect to run tasks
# when using ECSRun: <https://docs.prefect.io/api/latest/run_configs.html#ecsrun>
LogConfiguration:
LogDriver: "awslogs"
Options:
awslogs-group: ${self:custom.ecsClusterLogGroupName}
awslogs-region: ${self:custom.region}
awslogs-stream-prefix: ${self:custom.ecsTaskLogPrefix}
With this flow:
RUN_CONFIG = ECSRun(
labels=LABELS,
task_definition_arn=TASK_DEFINITION_FAMILY,
memory=512,
cpu=256,
)
@task(log_stdout=True)
def bye_world():
print("bye world")
@task(log_stdout=True)
def hello_world():
print("Hello, World!")
with Flow("a_flow", run_config=RUN_CONFIG, storage=STORAGE) as flow:
hello_world()
bye_world()
Kevin Kho
Marvin
08/19/2021, 4:54 AMKevin Kho
Eddie Atkinson
08/19/2021, 5:02 AMKevin Kho