Myles Steinhauser
03/25/2022, 5:58 PMError during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id="1c85cc1e-7e8c-46c9-86a2-d4beb75ea9ce")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
flow_b = create_flow_run(flow_name="B", project_name="myles")
wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
flow_c = create_flow_run(flow_name="C", project_name="myles")
wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
flow_d = create_flow_run(flow_name="D", project_name="myles")
wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
flow_a.set_upstream(rval_say_hi)
flow_b.set_upstream(wait_for_flow_a)
flow_c.set_upstream(wait_for_flow_a)
flow_d.set_upstream(wait_for_flow_b)
flow_d.set_upstream(wait_for_flow_c)
Child Flow (same across all 4 child flows):
@task
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
# run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
say_hi()
create_flow_run()
call, regardless if I specify a flow_id
or a flow_name, project_name
The child flows all run correctly when run on their own, but the parent flow always fails to create them as children.Kevin Kho
flow_a
start here? What happens if you do?
flow_b = create_flow_run(flow_id=None, flow_name="B", project_name="myles")
And what is your Prefect version?Myles Steinhauser
03/25/2022, 6:10 PMflow_a
never appears to start.
I’ll try that now
Running on prefect-1.1.0 with Prefect Cloud connected to ECSAgent└── 14:18:05 | ERROR | Task 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/myles/Library/Caches/pypoetry/virtualenvs/saasworks-py-ScOzV3iT-py3.10/lib/python3.10/site-packages/prefect/tasks/prefect/flow_run.py", line 115, in create_flow_run
ValueError: Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
start_flow_run = StartFlowRun(project_name="myles", wait=True)
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
Still fails consistently in the same way.start_flow_run = StartFlowRun(project_name="myles", wait=True)
flow_a = start_flow_run(flow_name="A")
flow_a.set_upstream(rval_say_hi)
Everything executes as desired!Kevin Kho
StartFlowRun
and create_flow_run
?Myles Steinhauser
03/25/2022, 6:29 PMcreate_flow_run
was broken.Kevin Kho
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
start_flow_run = StartFlowRun(project_name="myles", wait=True)
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
# assumes you have registered the following flows in a project named "myles"
flow_a = create_flow_run(flow_id=None, flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
start_flow_run
is not used right?Myles Steinhauser
03/25/2022, 6:31 PMcreate_flow_run
kept failing. It seems like that’s the simpler API, though, so I was trying to stick with it.Kevin Kho
start_flow_run
one secMyles Steinhauser
03/25/2022, 6:40 PMStartFlowRun
executes correctly consistently:
import datetime
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun, create_flow_run, wait_for_flow_run
from prefect.run_configs import ECSRun
from prefect.storage import S3
from prefect.executors import LocalDaskExecutor
FLOW_NAME = "ecs_demo_ecr"
S3_BUCKET="sw-app-sandbox"
# launchType="FARGATE"
launchType="EC2"
RUN_CONFIG = ECSRun(
env={"SOME_VAR": "VALUE"},
labels=["sandbox"],
task_definition=dict(
family=FLOW_NAME,
requiresCompatibilities=[launchType],
networkMode="bridge",
cpu=128,
memory=128,
taskRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskRole-1DTCELFAE040E",
executionRoleArn=f"arn:aws:iam::<redacted>:role/prefect-ecs-cluster-v6-ECSTaskExecutionRole-61S1X6JMJ5A8",
containerDefinitions=[
dict(
name="flow",
image="prefecthq/prefect:latest-python3.9",
)
],
),
run_task_kwargs=dict(
cluster="prefect-ecs-cluster-v6-ECSCluster-IgNcyDRUJqLa",
launchType=launchType,
enableECSManagedTags=True,
enableExecuteCommand=True,
),
)
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, FLOW_NAME)
return
with Flow(FLOW_NAME,
run_config=RUN_CONFIG,
storage=S3(bucket=S3_BUCKET),
executor=LocalDaskExecutor()
) as flow:
rval_say_hi = say_hi()
flow_a = StartFlowRun(flow_name="A", project_name="myles", wait=True)
flow_b = StartFlowRun(flow_name="B", project_name="myles", wait=True)
flow_c = StartFlowRun(flow_name="C", project_name="myles", wait=True)
flow_d = StartFlowRun(flow_name="D", project_name="myles", wait=True)
flow_a.set_upstream(rval_say_hi)
flow_b.set_upstream(flow_a)
flow_c.set_upstream(flow_a)
flow_d.set_upstream(flow_b)
flow_d.set_upstream(flow_c)
Kevin Kho
from prefect import task, Flow
import prefect
import datetime
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "test"
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow("say_hi") as flow:
rval_say_hi = say_hi()
with Flow(FLOW_NAME) as flow2:
rval_say_hi = say_hi()
flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
flow.register("databricks")
flow2.register("databricks")
Myles Steinhauser
03/25/2022, 6:42 PMFlow("say_hi")
in a different Python file and register it?Kevin Kho
Myles Steinhauser
03/25/2022, 6:42 PMKevin Kho
Myles Steinhauser
03/25/2022, 6:42 PMKevin Kho
from prefect import task, Flow
import prefect
import datetime
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "test"
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow(FLOW_NAME) as flow2:
rval_say_hi = say_hi()
flow_a = create_flow_run(flow_name="say_hi", project_name="databricks")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
flow_a.set_upstream(rval_say_hi)
flow2.register("databricks")
subflow:
from prefect import task, Flow
import prefect
import datetime
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def say_hi():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi from Prefect %s from flow %s", prefect.__version__, prefect.context.flow_name)
return
with Flow("say_hi") as flow:
rval_say_hi = say_hi()
flow.register("databricks")
Myles Steinhauser
03/25/2022, 6:46 PMKevin Kho
Myles Steinhauser
03/25/2022, 6:51 PMKevin Kho
flow.run()
also? Or were you using flow.run()
to test it? I’m trying to see which specific call failedMyles Steinhauser
03/25/2022, 6:53 PMprefect register --project myles -p parent-flow.py
and prefect run --project myles --name parent-flow --watch
cycleKevin Kho
Myles Steinhauser
03/25/2022, 8:07 PMprefect register
on the local system to upload to an S3 bucket for Agent execution.