wonsun
11/02/2022, 8:36 AMFlowRunTask
to StartFlowRun
(reference link) among the methods written in that article to create the same situation. Below attached scripts, it is the case that two flows(flow_waveforms and flow_pipeline) that are executed well have already been created, and parent_flow is created to sequentially execute those flows.
# parent_flow.py
import prefect
from prefect.tasks.prefect.flow_run import StartFlowRun
with Flow('mother_flow') as flow:
first_job = StartFlowRun(flow_name='flow_waveforms',
project_name='InterFlow_Dependencies',
wait=True)
second_job = StartFlowRun(flow_name='flow_pipeline',
project_name='InterFlow_Dependencies',
wait=True)
first_job.set_downstream(second_job)
if __name__ == '__main__':
flow.register("InterFlow_Dependencies")
# flow_waveforms.py
import prefect
from prefect import Flow, task
@task
def task1():
something
return one, two
@task
def task2():
something
with Flow('flow_waveforms') as flow:
var1, var2 = task1()
finish = task2()
flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE':'thread'})
flow.register('InterFlow_Dependencies')
# flow_pipeline.py
import prefect
from prefect import Flow, task
@task
def task3():
something
with Flow('flow_pipelines') as flow:
task3()
flow.register('InterFlow_Dependencies')
If i excuted mother_flow.py in this case, prefect responds following error.(image)
Any idea where I went wrong? Or could you tell me if there is an easy way to define dependencies between flows other than this way?
Thx.Anna Geller
11/02/2022, 11:50 AMwonsun
11/03/2022, 12:44 AMAnna Geller
11/03/2022, 12:54 AMI'm still not perfect at using prefectme neither 🙌 registration shouldn't really be in the flow, it's a separate step - this might be helpful to understand what I mean https://github.com/anna-geller/orchestrator-pattern/tree/master/simple_parent_child_example
wonsun
11/03/2022, 1:13 AMAnna Geller
11/03/2022, 11:58 AM