Aaron Goebel
01/11/2023, 2:20 AMrun_deployment
orchestrator pattern to chain together deployments dynamically. Some of these deployments depend on others. I've attempted to finagle a way around this by wrapping run_deployment
in a task as such:
@task
async def run_deployment(depl_id: str, parameters: dict):
async with prefect.context(**prefect.context.run_params):
async with prefect.Client() as client:
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
I then create tasks as such
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in graph.items()}
and set their dependencies:
# set dependencies
for flow_name, flow_params in graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].set_upstream(tasks[dependency], flow=True)
the goal here is to dogfood the taskrunner to chain these together and kick off the flow with something like
await task_runner.submit(tasks)
Two issues I see with this though:
1. Some of the parameters
to downstream create_deployment
runs are derived from upstream deployment runs. I wonder if anyone knows an elegant way of doing that?
2. await task_runner.submit(tasks)
would actually work as anticipated?