saml
11/03/2021, 5:56 PMKevin Kho
StartFlowRun
task and create_flow_run
task in the third flow and then put your additional tasks after that. The important of the terminal task into the master flow won’t work I thinksaml
11/03/2021, 6:04 PMsaml
11/03/2021, 6:05 PMwith Flow("Cases Analysis", executor=exe) as flow:
saml
11/03/2021, 6:06 PM#flow c as currently defined
flow_run = StartFlowRun(flow_name="Cases Analysis", project_name="Analysis")
with Flow("parent-flow") as flow:
flow_run()
flow.run()
saml
11/03/2021, 6:07 PMKevin Kho
StartFlowRun
will run a registered flow so the flow_a and flow_b need to be registered in the backend (Cloud or Server) for this to work.saml
11/03/2021, 6:08 PMKevin Kho
StartFlowRun
task, no you can’t. I have an idea that I’ll mention but I’m not sure it’ll work. Let me show you
@task
def abc(...):
return 1
@task
def bcd(...):
return 2
with Flow(...) as flow:
abc()
# notice flow is already defined. we can add another task
with flow:
bcd()
Kevin Kho
saml
11/03/2021, 6:11 PMsaml
11/03/2021, 6:12 PMsaml
11/03/2021, 6:12 PMKevin Kho
flow A
, and then flow B
imports flow
and adds more tasks and then flow C
imports the updated flow
and then adds more tasks. So it’s still pretty uglysaml
11/03/2021, 6:14 PMsaml
11/03/2021, 6:15 PMKevin Kho
flow_a
and flow_b
. Just import that tasks from flow_c
and construct the whole flow there. Really though I guess the standard way is creating subflows. The experience is rough for sure so for Prefect 2.0 these subflows will be a first class part of the main flow.Kevin Kho
saml
11/03/2021, 6:16 PMsaml
11/03/2021, 6:43 PMfrom case_flow import flow as case_flow
from case_flow import task_save_case_final_data
@task
def test_final_task():
print("final task sleep")
sleep(10)
print("done sleeping")
final_flow = Flow("parent-flow")
final_flow.update(case_flow)
final_flow.add_task(test_final_task(flow=final_flow, upstream_tasks=[case_flow.reference_tasks()]))
final_flow.visualize()
Kevin Kho
saml
11/03/2021, 6:51 PMfrom prefect import task, Flow
from time import sleep
from case_flow import flow as case_flow
@task
def test_final_task(other_task_str:str):
print("final task sleep")
print(other_task_str)
sleep(10)
print("done sleeping")
@task
def other_task():
return "I'm another task"
with Flow("other flow") as other_flow:
other_task()
final_flow = Flow("parent-flow")
final_flow.update(case_flow)
final_flow.add_task(test_final_task(other_task_str=other_flow.get_tasks(name='other_task'), flow=final_flow , upstream_tasks=[case_flow.reference_tasks()]))
final_flow.visualize()
final_flow.run()
saml
11/03/2021, 6:51 PMKevin Kho
saml
11/03/2021, 6:53 PMKevin Kho
StartFlowRun
already since you can just register the subflow and then invoke it from the main flow. I think the flexibility this provides is on the development side for sure.saml
11/03/2021, 6:57 PMKevin Kho
create_flow_run
, wait_for_flow_run
, and get_task_run_result
. The get_task_run_result
loads it from the result location. With your current set-up though, it will indeed be memory since they all will get registered togethersaml
11/03/2021, 7:02 PM