Varun Joshi
06/08/2021, 5:17 PMKevin Kho
StartFlowRun
task with upstream dependencies set to do this.Varun Joshi
06/08/2021, 5:31 PMJoão Amorim
06/08/2021, 5:33 PMKevin Kho
from prefect import Flow, task
from prefect.tasks.prefect.flow_run import StartFlowRun
import prefect
@task
def test(x):
return x+1
with Flow("flowA") as flow1:
test(1)
with Flow("flowB") as flow2:
test(1)
flow1.register("omlds")
flow2.register("omlds")
with Flow("main") as flow:
a = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 1})()
b = StartFlowRun("flowA", "omlds",
new_flow_context=prefect.context.get('config'),
wait=True,
parameters={'x': 2})()
b.set_upstream(a)
flow.register("omlds")
Chris White
Varun Joshi
06/08/2021, 5:42 PM