I am using the `create_flow_run_from_deployment` / `run_deployment` APIs to compose my deployments o...
a
I am using the
create_flow_run_from_deployment
/
run_deployment
APIs to compose my deployments on http request. E.g. I get a payload defining a set of flows to chain together, that triggers a parent container flow, and that container flow does the orchestration of tying things together. The way I'm thinking of running it, because there are dependencies, is to have the
run_deployment
code wrapped in a task like this :
Copy code
@task
async def run_deployment(depl_id: str, parameters: dict):
    async with prefect.context(**prefect.context.run_params):
        async with prefect.Client() as client:
            run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
            run_state = await run.get_state()
            return run_state.result
and then in an orchestration flow I think I want to do something like this where I create these tasks for each deployment.
Copy code
@flow
async def container_flow(flow_graph: dict):
    results = {}

    ordered_flows = order(flow_graph)

    # create tasks
    tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
             for name, flow_params in flow_graph.items()}

    # set dependencies
    for flow_name, flow_params in flow_graph.items():
        for dependency in flow_params.get('dependencies', []):
            tasks[flow_name].set_upstream(tasks[dependency], flow=True)

    # run tasks concurrently
    flow_results = await prefect.engine.run(tasks, return_tasks=True)

    # store results
    for flow_name, flow_result in flow_results.items():
        results[flow_name] = flow_result.result
    return results
Also, what would need consideration if I wanted to pass the resultant state of a deployment to its dependents without altering the logic for the dependent flows. eg.
DeplB
expects input parameter
X
as the result of
DeplA
. I don't want to change the flow code in
DeplB
to optionally accept a
State
parameter. I just want to be able to pull it down from inside the
container_flow
501 Views