Aaron Goebel

01/11/2023, 2:20 AM
I am trying to use the
orchestrator pattern to chain together deployments dynamically. Some of these deployments depend on others. I've attempted to finagle a way around this by wrapping
in a task as such:
Copy code
async def run_deployment(depl_id: str, parameters: dict):
    async with prefect.context(**prefect.context.run_params):
        async with prefect.Client() as client:
            run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
            run_state = await run.get_state()
            return run_state.result
I then create tasks as such
Copy code
tasks = {name:['deployment_id'], parameters=flow_params['inputs'])
             for name, flow_params in graph.items()}
and set their dependencies:
Copy code
# set dependencies
    for flow_name, flow_params in graph.items():
        for dependency in flow_params.get('dependencies', []):
            tasks[flow_name].set_upstream(tasks[dependency], flow=True)
the goal here is to dogfood the taskrunner to chain these together and kick off the flow with something like
await task_runner.submit(tasks)
Two issues I see with this though: 1. Some of the
to downstream
runs are derived from upstream deployment runs. I wonder if anyone knows an elegant way of doing that? 2.
await task_runner.submit(tasks)
would actually work as anticipated?