I am using the
create_flow_run_from_deployment
/
run_deployment
APIs to compose my deployments on http request. E.g. I get a payload defining a set of flows to chain together, that triggers a parent container flow, and that container flow does the orchestration of tying things together.
The way I'm thinking of running it, because there are dependencies, is to have the
run_deployment
code wrapped in a task like this :
@task
async def run_deployment(depl_id: str, parameters: dict):
async with prefect.context(**prefect.context.run_params):
async with prefect.Client() as client:
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
and then in an orchestration flow I think I want to do something like this where I create these tasks for each deployment.
@flow
async def container_flow(flow_graph: dict):
results = {}
ordered_flows = order(flow_graph)
# create tasks
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in flow_graph.items()}
# set dependencies
for flow_name, flow_params in flow_graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].set_upstream(tasks[dependency], flow=True)
# run tasks concurrently
flow_results = await prefect.engine.run(tasks, return_tasks=True)
# store results
for flow_name, flow_result in flow_results.items():
results[flow_name] = flow_result.result
return results