Hello, has anyone run into Prefect registering the...
# ask-community
c
Hello, has anyone run into Prefect registering the same script twice when calling
prefect register
from the CLI? Storage: S3 Execution: ECS Prefect version:
0.14.22
Copy code
vscode ➜ /workspaces/flowstate/flows (main ✗) $ prefect register --project prefect_project_name -p project_path

Collecting flows...

Processing 'project_path/flow_one.py':
  Building `S3` storage...
[2021-06-26 10:10:44+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_one/2021-06-26t10-10-44-047892-00-00 in aws-ecs-flows
  Registering 'flow_one'... Done
  └── ID: 41b7ea82-6cee-47c9-871c-0155267e6373
  └── Version: 1

Processing 'project_path/flow_two.py':
  Building `S3` storage...
[2021-06-26 10:10:45+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_one/2021-06-26t10-10-44-047892-00-00 in aws-ecs-flows
-------------------------------------------------------------------------------------------------------------------- * SAME FILE * -------------------------------------------------------------
[2021-06-26 10:10:45+0000] INFO - prefect.S3 | Uploading script /workspaces/github_project/flows/prefect_project_name/flow_one.py to flow_two/2021-06-26t10-10-45-180059-00-00 in aws-ecs-flows
  Registering 'flow_two'... Done
  └── ID: f03469a7-c463-486f-a032-53d0fd4f265c
  └── Version: 1

======================== 2 registered ========================
This results in an error when attempting to execute `flow_two`:
Copy code
Failed to load and execute Flow's environment: ValueError("Flow 'flow_two' not found in file. Found flows:\n- 'flow_one'")
k
Hi @CA Lee, would you be able to share your script here? Does
flow_one.py
have 2 flows in it?
c
Hey @Kevin Kho, thanks for helping out. Each flow has only one Flow object, will paste the edited contents of both scripts here soon
I edited out all the script content, leaving the skeleton structure
flow_one.py
Copy code
from src.my_modules import extract, transform, load

from prefect import task, Flow
from prefect.schedules import CronSchedule
from src.utils import S3_STORAGE, ECS_RUN_CONFIG

daily_recurring_1h = CronSchedule("30 1-10 * * 1-6")

# --------------------------------------------------- #
# Extract
# --------------------------------------------------- #

@task(nout=4, log_stdout=True)
def extract():
    do_something()
    
# --------------------------------------------------- #
# Transform
# --------------------------------------------------- #

@task(nout=4, log_stdout=True)
def transform():
    do_something()

# --------------------------------------------------- #
# Load
# --------------------------------------------------- #

@task(log_stdout=True)
def load():
    do_something()

# --------------------------------------------------- #
# Workflow
# --------------------------------------------------- #

FLOW_NAME = "flow one"

with Flow(
    FLOW_NAME,
    storage=S3_STORAGE,
    run_config=ECS_RUN_CONFIG,
    schedule=daily_recurring_1h) as flow:

    extract()
    transform()
    load()
flow_two.py
Copy code
from src.my_module import my_func

from prefect import Flow
from prefect.schedules import IntervalSchedule
from src.utils import S3_STORAGE, ECS_RUN_CONFIG

recurring_90m = IntervalSchedule(interval=timedelta(minutes=90))

FLOW_NAME = "flow two"

with Flow(
    FLOW_NAME,
    storage=S3_STORAGE,
    run_config=ECS_RUN_CONFIG,
    schedule=recurring_90m) as flow:

    my_func()
S3_STORAGE
and
ECS_RUN_CONFIG
definitions:
src.utils.py
Copy code
# Run configuration
from prefect.storage import S3
from prefect.run_configs import ECSRun

# Default bucket = aws-ecs-flows

S3_STORAGE = S3(
    bucket="aws-ecs-flows",
    stored_as_script=True
)

# We use this Docker image as the default to run our flows

ECS_RUN_CONFIG = ECSRun(
    image="<http://xxxxxxxxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest|xxxxxxxxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest>",
    labels=['ecs']
)
k
Do you get the same behavior when registering just one file instead of the directory?
I think I replicated. Will ask the team.
There does seem to be some bug here. We’ll take a look and open an issue.
The issue to follow is here for this
c
Hey @Kevin Kho, can confirm that registering one file only works instead of the directory - the correct script gets uploaded to S3 If a directory path is supplied to prefect register -p flag, only the first file in the directory is uploaded to S3
@Kevin Kho Can also confirm that if the S3 storage and ECS run config are defined within the flow itself, instead of being imported from another module, the flows register fine. However, this would lead to a lot of repeated code i.e. these lines added to the top of each file
Copy code
# Run configuration
from prefect.storage import S3
from prefect.run_configs import ECSRun

S3_STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
ECS_RUN_CONFIG = ECSRun(image="<http://xxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest|xxxx.dkr.ecr.ap-southeast-1.amazonaws.com/aws-ecs-flows:latest>", labels=['ecs'])
instead of
Copy code
from src.utils import S3_STORAGE, ECS_RUN_CONFIG
k
Yeah the importing of storage for the two flows messing up is a bug on our side, and the issue has been opened so the core team is aware of it.