I am trying to use the `run_deployment` orchestrator pattern to chain together deployments dynamical...
a
I am trying to use the
run_deployment
orchestrator pattern to chain together deployments dynamically. Some of these deployments depend on others. I've attempted to finagle a way around this by wrapping
run_deployment
in a task as such:
Copy code
@task
async def run_deployment(depl_id: str, parameters: dict):
    async with prefect.context(**prefect.context.run_params):
        async with prefect.Client() as client:
            run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
            run_state = await run.get_state()
            return run_state.result
I then create tasks as such
Copy code
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
             for name, flow_params in graph.items()}
and set their dependencies:
Copy code
# set dependencies
    for flow_name, flow_params in graph.items():
        for dependency in flow_params.get('dependencies', []):
            tasks[flow_name].set_upstream(tasks[dependency], flow=True)
the goal here is to dogfood the taskrunner to chain these together and kick off the flow with something like
await task_runner.submit(tasks)
Two issues I see with this though: 1. Some of the
parameters
to downstream
create_deployment
runs are derived from upstream deployment runs. I wonder if anyone knows an elegant way of doing that? 2.
await task_runner.submit(tasks)
would actually work as anticipated?