https://prefect.io logo
Title
m

Myles Steinhauser

03/25/2022, 5:58 PM
I’m trying to follow the Scheduling a Flow-of-Flows example in the docs, but I keep running into the following error:
Error during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.
Parent Flow:
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()


    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id="1c85cc1e-7e8c-46c9-86a2-d4beb75ea9ce")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)

    flow_b = create_flow_run(flow_name="B", project_name="myles")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)

    flow_c = create_flow_run(flow_name="C", project_name="myles")
    wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)

    flow_d = create_flow_run(flow_name="D", project_name="myles")
    wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(wait_for_flow_a)
    flow_c.set_upstream(wait_for_flow_a)
    flow_d.set_upstream(wait_for_flow_b)
    flow_d.set_upstream(wait_for_flow_c)
Child Flow (same across all 4 child flows):
@task
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
        #   run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    say_hi()
Based on the logs available to me, it appears that the parent flow fails (always) at the
create_flow_run()
call, regardless if I specify a
flow_id
or a
flow_name, project_name
The child flows all run correctly when run on their own, but the parent flow always fails to create them as children.
k

Kevin Kho

03/25/2022, 6:08 PM
This is weird. Does
flow_a
start here? What happens if you do?
flow_b = create_flow_run(flow_id=None, flow_name="B", project_name="myles")
And what is your Prefect version?
m

Myles Steinhauser

03/25/2022, 6:10 PM
Nope,
flow_a
never appears to start. I’ll try that now Running on prefect-1.1.0 with Prefect Cloud connected to ECSAgent
Nothing changes, all of the same failures:
└── 14:18:05 | ERROR   | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/myles/Library/Caches/pypoetry/virtualenvs/saasworks-py-ScOzV3iT-py3.10/lib/python3.10/site-packages/prefect/tasks/prefect/flow_run.py", line 115, in create_flow_run
ValueError: Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.
Stepping back to an even simpler test case
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
Still fails consistently in the same way.
More interesting info. I’m essentially following this guide: https://discourse.prefect.io/t/how-can-i-create-a-subflow-and-block-until-it-s-completed/94 And if I use:
start_flow_run = StartFlowRun(project_name="myles", wait=True)

flow_a = start_flow_run(flow_name="A")
flow_a.set_upstream(rval_say_hi)
Everything executes as desired!
k

Kevin Kho

03/25/2022, 6:28 PM
A bit confused why your snippet has both
StartFlowRun
and
create_flow_run
?
m

Myles Steinhauser

03/25/2022, 6:29 PM
Because
create_flow_run
was broken.
k

Kevin Kho

03/25/2022, 6:30 PM
No but over here:
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
start_flow_run
is not used right?
m

Myles Steinhauser

03/25/2022, 6:31 PM
Ah, originally it was not used. I started going in that direction since
create_flow_run
kept failing. It seems like that’s the simpler API, though, so I was trying to stick with it.
k

Kevin Kho

03/25/2022, 6:32 PM
Ah ok let me try this with that code without the
start_flow_run
one sec
👍 1
m

Myles Steinhauser

03/25/2022, 6:40 PM
Confirmed that
StartFlowRun
executes correctly consistently:
import datetime
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun, create_flow_run, wait_for_flow_run
from prefect.run_configs import ECSRun
from prefect.storage import S3
from prefect.executors import LocalDaskExecutor


FLOW_NAME = "ecs_demo_ecr"
S3_BUCKET="sw-app-sandbox"

# launchType="FARGATE"
launchType="EC2"

RUN_CONFIG = ECSRun(
    env={"SOME_VAR": "VALUE"},
    labels=["sandbox"],
    task_definition=dict(
        family=FLOW_NAME,
        requiresCompatibilities=[launchType],
        networkMode="bridge",
        cpu=128,
        memory=128,
        taskRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskRole-1DTCELFAE040E",
        executionRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskExecutionRole-61S1X6JMJ5A8",
        containerDefinitions=[
            dict(
                name="flow",
                image="prefecthq/prefect:latest-python3.9",
            )
        ],
    ),
    run_task_kwargs=dict(
        cluster="prefect-ecs-cluster-v6-ECSCluster-IgNcyDRUJqLa",
        launchType=launchType,
        enableECSManagedTags=True,
        enableExecuteCommand=True,
    ),
)

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    flow_a = StartFlowRun(flow_name="A", project_name="myles", wait=True)
    flow_b = StartFlowRun(flow_name="B", project_name="myles", wait=True)
    flow_c = StartFlowRun(flow_name="C", project_name="myles", wait=True)
    flow_d = StartFlowRun(flow_name="D", project_name="myles", wait=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(flow_a)
    flow_c.set_upstream(flow_a)
    flow_d.set_upstream(flow_b)
    flow_d.set_upstream(flow_c)
k

Kevin Kho

03/25/2022, 6:41 PM
This is working for me:
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow.register("databricks")
flow2.register("databricks")
👀 1
m

Myles Steinhauser

03/25/2022, 6:42 PM
What happens if you define
Flow("say_hi")
in a different Python file and register it?
k

Kevin Kho

03/25/2022, 6:42 PM
I register the subflow and main flow in one go on local storage to be clear
m

Myles Steinhauser

03/25/2022, 6:42 PM
right
k

Kevin Kho

03/25/2022, 6:42 PM
Let me split it one sec
m

Myles Steinhauser

03/25/2022, 6:42 PM
I’m trying to compose multiple disparate Flows together (for ease of management, and overall ETL composition)
k

Kevin Kho

03/25/2022, 6:44 PM
So it still works
🤯 1
parent_flow
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow2.register("databricks")
subflow:
from prefect import task, Flow
import prefect
import datetime 

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

flow.register("databricks")
m

Myles Steinhauser

03/25/2022, 6:46 PM
This is wild to me! But, so helpful to have a confirmed working example. Going to take almost the same code and modify it for my ECS runs.
k

Kevin Kho

03/25/2022, 6:47 PM
I really can’t tell why yours is failing 🤨….could you give me the full script for the failing one? You can remove sensitive stuff like AWS stuff and DM to it to me
m

Myles Steinhauser

03/25/2022, 6:51 PM
Yep, testing a very lightly modified version of your script now.
k

Kevin Kho

03/25/2022, 6:52 PM
Do you get more logs doing
flow.run()
also? Or were you using
flow.run()
to test it? I’m trying to see which specific call failed
m

Myles Steinhauser

03/25/2022, 6:53 PM
I’m using the
prefect register --project myles -p parent-flow.py
and
prefect run --project myles --name parent-flow --watch
cycle
this failed!
same error
I’ll DM you the code files
k

Kevin Kho

03/25/2022, 6:53 PM
ok
Talked to Myles: summary is this seems to fail when he uses ECS RunConfig. There might be a versioning issue with registration using 3.10 and the ECS container using 3.9. Unsure at the moment and finding it hard to figure it out. I need to test on ECS
:upvote: 1
m

Myles Steinhauser

03/25/2022, 8:07 PM
Further investigation revealed the issue occurs when local environment is running Python 3.10.2 while the ECS agent was running Python 3.9.11. Once the local dev environment was downgraded to Python 3.9.11, all calls function as expected! This was most critical due to relying on
prefect register
on the local system to upload to an S3 bucket for Agent execution.