Harry Baker
03/25/2022, 8:45 PM@task()
def flow_run_helper(flow_name, project_name):
cfr = create_flow_run(flow_name=flow_name, project_name=project_name)
wfr = wait_for_flow_run(cfr, stream_logs=True, raise_final_state=True)
return wfr
but its yelling at me about "ValueError: Could not infer an active Flow context while creating edge". my app does a lot of chaining of flows, so i wanted to streamline thisAnna Geller
Kevin Kho
StartFlowRun(..., wait=True)
does it in one goAnna Geller
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True)
with Flow("FLOW_NAME") as flow:
staging = start_flow_run(flow_name="child_flow_name")
Harry Baker
03/25/2022, 8:47 PMsd_id = create_flow_run(flow_name='sync_data', project_name="proj")
sd_wait = wait_for_flow_run(sd_id, stream_logs=True, raise_final_state=True)
pd_id = create_flow_run(flow_name='process_data', project_name="proj")
pd_wait = wait_for_flow_run(pd_id, stream_logs=True, raise_final_state=True)
pd_id.set_upstream(sd_wait)
ed_id = create_flow_run(flow_name='export_data', project_name="proj")
ed_wait = wait_for_flow_run(ed_id, stream_logs=True, raise_final_state=True)
ed_id.set_upstream(pd_wait)
Anna Geller
Kevin Kho
.run()
. To use a task inside another task:
@task
def abc():
bcd.run(...)
cde.run(...)
but note it’s not a task. it’s just the Python function underneathHarry Baker
03/25/2022, 8:53 PM