I’m trying to follow the <Scheduling a Flow-of-Flo...
# prefect-community
m
I’m trying to follow the Scheduling a Flow-of-Flows example in the docs, but I keep running into the following error:
Copy code
Error during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.
Parent Flow:
Copy code
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()


    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id="1c85cc1e-7e8c-46c9-86a2-d4beb75ea9ce")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)

    flow_b = create_flow_run(flow_name="B", project_name="myles")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)

    flow_c = create_flow_run(flow_name="C", project_name="myles")
    wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)

    flow_d = create_flow_run(flow_name="D", project_name="myles")
    wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(wait_for_flow_a)
    flow_c.set_upstream(wait_for_flow_a)
    flow_d.set_upstream(wait_for_flow_b)
    flow_d.set_upstream(wait_for_flow_c)
Child Flow (same across all 4 child flows):
Copy code
@task
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
        #   run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    say_hi()
Based on the logs available to me, it appears that the parent flow fails (always) at the
create_flow_run()
call, regardless if I specify a
flow_id
or a
flow_name, project_name
The child flows all run correctly when run on their own, but the parent flow always fails to create them as children.
k
This is weird. Does
flow_a
start here? What happens if you do?
Copy code
flow_b = create_flow_run(flow_id=None, flow_name="B", project_name="myles")
And what is your Prefect version?
m
Nope,
flow_a
never appears to start. I’ll try that now Running on prefect-1.1.0 with Prefect Cloud connected to ECSAgent
Nothing changes, all of the same failures:
Copy code
└── 14:18:05 | ERROR   | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/myles/Library/Caches/pypoetry/virtualenvs/saasworks-py-ScOzV3iT-py3.10/lib/python3.10/site-packages/prefect/tasks/prefect/flow_run.py", line 115, in create_flow_run
ValueError: Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.
Stepping back to an even simpler test case
Copy code
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
Still fails consistently in the same way.
More interesting info. I’m essentially following this guide: https://discourse.prefect.io/t/how-can-i-create-a-subflow-and-block-until-it-s-completed/94 And if I use:
Copy code
start_flow_run = StartFlowRun(project_name="myles", wait=True)

flow_a = start_flow_run(flow_name="A")
flow_a.set_upstream(rval_say_hi)
Everything executes as desired!
k
A bit confused why your snippet has both
StartFlowRun
and
create_flow_run
?
m
Because
create_flow_run
was broken.
k
No but over here:
Copy code
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


start_flow_run = StartFlowRun(project_name="myles", wait=True)


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    # assumes you have registered the following flows in a project named "myles"
    flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)
start_flow_run
is not used right?
m
Ah, originally it was not used. I started going in that direction since
create_flow_run
kept failing. It seems like that’s the simpler API, though, so I was trying to stick with it.
k
Ah ok let me try this with that code without the
start_flow_run
one sec
👍 1
m
Confirmed that
StartFlowRun
executes correctly consistently:
Copy code
import datetime
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun, create_flow_run, wait_for_flow_run
from prefect.run_configs import ECSRun
from prefect.storage import S3
from prefect.executors import LocalDaskExecutor


FLOW_NAME = "ecs_demo_ecr"
S3_BUCKET="sw-app-sandbox"

# launchType="FARGATE"
launchType="EC2"

RUN_CONFIG = ECSRun(
    env={"SOME_VAR": "VALUE"},
    labels=["sandbox"],
    task_definition=dict(
        family=FLOW_NAME,
        requiresCompatibilities=[launchType],
        networkMode="bridge",
        cpu=128,
        memory=128,
        taskRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskRole-1DTCELFAE040E",
        executionRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskExecutionRole-61S1X6JMJ5A8",
        containerDefinitions=[
            dict(
                name="flow",
                image="prefecthq/prefect:latest-python3.9",
            )
        ],
    ),
    run_task_kwargs=dict(
        cluster="prefect-ecs-cluster-v6-ECSCluster-IgNcyDRUJqLa",
        launchType=launchType,
        enableECSManagedTags=True,
        enableExecuteCommand=True,
    ),
)

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
    return


with Flow(FLOW_NAME,
          run_config=RUN_CONFIG,
          storage=S3(bucket=S3_BUCKET),
          executor=LocalDaskExecutor()
          ) as flow:
    rval_say_hi = say_hi()

    flow_a = StartFlowRun(flow_name="A", project_name="myles", wait=True)
    flow_b = StartFlowRun(flow_name="B", project_name="myles", wait=True)
    flow_c = StartFlowRun(flow_name="C", project_name="myles", wait=True)
    flow_d = StartFlowRun(flow_name="D", project_name="myles", wait=True)

    flow_a.set_upstream(rval_say_hi)
    flow_b.set_upstream(flow_a)
    flow_c.set_upstream(flow_a)
    flow_d.set_upstream(flow_b)
    flow_d.set_upstream(flow_c)
k
This is working for me:
Copy code
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow.register("databricks")
flow2.register("databricks")
👀 1
m
What happens if you define
Flow("say_hi")
in a different Python file and register it?
k
I register the subflow and main flow in one go on local storage to be clear
m
right
k
Let me split it one sec
m
I’m trying to compose multiple disparate Flows together (for ease of management, and overall ETL composition)
k
So it still works
🤯 1
parent_flow
Copy code
from prefect import task, Flow
import prefect
import datetime 
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "test"

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow(FLOW_NAME) as flow2:
    rval_say_hi = say_hi()
    flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
    flow_a.set_upstream(rval_say_hi)

flow2.register("databricks")
subflow:
Copy code
from prefect import task, Flow
import prefect
import datetime 

@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
    return

with Flow("say_hi") as flow:
    rval_say_hi = say_hi()

flow.register("databricks")
m
This is wild to me! But, so helpful to have a confirmed working example. Going to take almost the same code and modify it for my ECS runs.
k
I really can’t tell why yours is failing 🤨….could you give me the full script for the failing one? You can remove sensitive stuff like AWS stuff and DM to it to me
m
Yep, testing a very lightly modified version of your script now.
k
Do you get more logs doing
flow.run()
also? Or were you using
flow.run()
to test it? I’m trying to see which specific call failed
m
I’m using the
prefect register --project myles -p parent-flow.py
and
prefect run --project myles --name parent-flow --watch
cycle
this failed!
same error
I’ll DM you the code files
k
ok
Talked to Myles: summary is this seems to fail when he uses ECS RunConfig. There might be a versioning issue with registration using 3.10 and the ECS container using 3.9. Unsure at the moment and finding it hard to figure it out. I need to test on ECS
upvote 1
m
Further investigation revealed the issue occurs when local environment is running Python 3.10.2 while the ECS agent was running Python 3.9.11. Once the local dev environment was downgraded to Python 3.9.11, all calls function as expected! This was most critical due to relying on
prefect register
on the local system to upload to an S3 bucket for Agent execution.