CA Lee
06/26/2021, 10:19 AMprefect register
from the CLI?
Storage: S3
Execution: ECS
Prefect version: 0.14.22
vscode ➜ /workspaces/flowstate/flows (main ✗) $ prefect register --project prefect_project_name -p project_path
Collecting flows...
Processing 'project_path/flow_one.py':
Building `S3` storage...
[2021-06-26 10:10:44+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_one/2021-06-26t10-10-44-047892-00-00 in aws-ecs-flows
Registering 'flow_one'... Done
└── ID: 41b7ea82-6cee-47c9-871c-0155267e6373
└── Version: 1
Processing 'project_path/flow_two.py':
Building `S3` storage...
[2021-06-26 10:10:45+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_one/2021-06-26t10-10-44-047892-00-00 in aws-ecs-flows
-------------------------------------------------------------------------------------------------------------------- * SAME FILE * -------------------------------------------------------------
[2021-06-26 10:10:45+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_two/2021-06-26t10-10-45-180059-00-00 in aws-ecs-flows
Registering 'flow_two'... Done
└── ID: f03469a7-c463-486f-a032-53d0fd4f265c
└── Version: 1
======================== 2 registered ========================
This results in an error when attempting to execute `flow_two`:
Failed to load and execute Flow's environment: ValueError("Flow 'flow_two' not found in file. Found flows:\n- 'flow_one'")
Kevin Kho
flow_one.py
have 2 flows in it?CA Lee
06/28/2021, 1:59 AMCA Lee
06/28/2021, 2:02 AMCA Lee
06/28/2021, 2:02 AMflow_one.py
from src.my_modules import extract, transform, load
from prefect import task, Flow
from prefect.schedules import CronSchedule
from src.utils import S3_STORAGE, ECS_RUN_CONFIG
daily_recurring_1h = CronSchedule("30 1-10 * * 1-6")
# --------------------------------------------------- #
# Extract
# --------------------------------------------------- #
@task(nout=4, log_stdout=True)
def extract():
do_something()
# --------------------------------------------------- #
# Transform
# --------------------------------------------------- #
@task(nout=4, log_stdout=True)
def transform():
do_something()
# --------------------------------------------------- #
# Load
# --------------------------------------------------- #
@task(log_stdout=True)
def load():
do_something()
# --------------------------------------------------- #
# Workflow
# --------------------------------------------------- #
FLOW_NAME = "flow one"
with Flow(
FLOW_NAME,
storage=S3_STORAGE,
run_config=ECS_RUN_CONFIG,
schedule=daily_recurring_1h) as flow:
extract()
transform()
load()
CA Lee
06/28/2021, 2:02 AMflow_two.py
from src.my_module import my_func
from prefect import Flow
from prefect.schedules import IntervalSchedule
from src.utils import S3_STORAGE, ECS_RUN_CONFIG
recurring_90m = IntervalSchedule(interval=timedelta(minutes=90))
FLOW_NAME = "flow two"
with Flow(
FLOW_NAME,
storage=S3_STORAGE,
run_config=ECS_RUN_CONFIG,
schedule=recurring_90m) as flow:
my_func()
CA Lee
06/28/2021, 2:03 AMS3_STORAGE
and ECS_RUN_CONFIG
definitions:
src.utils.py
# Run configuration
from prefect.storage import S3
from prefect.run_configs import ECSRun
# Default bucket = aws-ecs-flows
S3_STORAGE = S3(
bucket="aws-ecs-flows",
stored_as_script=True
)
# We use this Docker image as the default to run our flows
ECS_RUN_CONFIG = ECSRun(
image="<http://xxxxxxxxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest|xxxxxxxxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest>",
labels=['ecs']
)
Kevin Kho
Kevin Kho
Kevin Kho
Kevin Kho
CA Lee
06/29/2021, 2:54 AMCA Lee
06/30/2021, 7:30 AM# Run configuration
from prefect.storage import S3
from prefect.run_configs import ECSRun
S3_STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
ECS_RUN_CONFIG = ECSRun(image="<http://xxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest|xxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest>", labels=['ecs'])
instead of
from src.utils import S3_STORAGE, ECS_RUN_CONFIG
Kevin Kho