I am trying to use the orchestrator pattern to cha...
# prefect-community
a
I am trying to use the orchestrator pattern to chain together
deployments
dynamically at run time. I need deployments because I need some of my pipeline code to run on specific machines. I already have a DAG defined with how the deployments should be chained, and I also determine that some outputs of deployment runs should be used as input to other
run_deployment
invocations. I'd like to use Prefect for the scheduling of everything, so my approach has been to wrap the
run_deployment
in a
task
. e.g.:
Copy code
@task
async def run_deployment_task(depl_id: str, parameters: dict):
   client = await get_client()
   run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
   run_state = await run.get_state()
   return run_state.result
So, given a set of deployments, their dependencies, and their parameters in a graph, I basically want to run something like this:
Copy code
@flow
async def run_deployments_flow(graph: dict):
    # create tasks
    tasks = {name: run_deployment_task.submit(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
             for name, flow_params in graph.items()}

    # set task dependencies
    for flow_name, flow_params in graph.items():
        for dependency in flow_params.get('dependencies', []):
            tasks[flow_name].wait_for(tasks[dependency])

    #await all tasks
In this way, the taskrunner deals with scheduling, managing concurrency etc. HOWEVER. My issue is that the output of some task runs needs piping into downstream tasks. This doesn't work natively with tasks, so with this approach I'd need to deal with the looping, polling of task status, etc. manually. essentially I think i'd likesomething like:
Copy code
for flow_name, flow_params in graph.items():
        upstream_future_state= []
        for dependency in flow_params.get('dependencies', []):
            upstream_future_results.append(tasks[dependency].state)
        inject_upstream_future_parameters_into(upstream_future_state,tasks[flow_name])
Where I could have the
run_deployment_task
accept futures of the upstream results, and resolve them in the parameters before kicking off a run. so it'd look something like
Copy code
@task
async def run_deployment_task(depl_id: str, parameters: dict, upstream_futures: Optional[List[Future]]):
   upstream_results = await asyncio.gather(*upstream_futures)
   client = await get_client()
   full_parameters = merge(parmaeters, upstream_results)
   run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=full_parameters)
   run_state = await run.get_state()
   return run_state.result
Is there any way I can use prefect to get this kind of desired result? Also looking for just general feedback of the approach. I know I can do a topological sort, run a while loop, kick off each deployment without dependencies manually, await its result, and pipe the results into downstream tasks when all of the results they depend on are resolved. I'm trying to avoid that if possible. tldr: want to dynamically chain deployments together where outputs of one may be inputs of another, and I want prefect to manage the orchestration of it all. perhaps tasks aren't the way, and there's a subflow approach ?
1
n
Hi @Aaron Goebel - is there a reason you're using the client directly and not
prefect.deployments.run_deployment
? the
run_deployment
utility will create a new run from a deployment by
name
and mark it as a subflow run of the flow thats calling it, e.g.
Copy code
from prefect import flow, task
from prefect.context import get_run_context
from prefect.deployments import run_deployment
from typing import Any, Dict


# deploy with prefect deployment build flow_chain.py:worker --name 'test-worker' -a
@flow(log_prints=True, persist_result=True)
def worker(message: str) -> str:
    print(f"Received message from parent: {message!r}")
    return f"Hello from worker run named: {get_run_context().flow_run.name}"

@task # this doesn't really need to be a task, but I imagine you may want to map many runs of same deployment sometimes
def kick_off_worker(name: str, params: Dict[str, Any]) -> "FlowRun":
    return run_deployment(
        name=name,
        parameters=params,
    )

# deploy with prefect deployment build flow_chain.py:orchestrator --name 'test-orchestrator' -a
@flow(log_prints=True)
def orchestrator(worker_params: Dict[str, Any]): # `worker_params` could also be a nested Dict with many worker params
    worker_A_run = kick_off_worker(
        name="worker/test-worker", params=worker_params
    )
        
    # below prints "Hello from worker run named: lean-chamois"
    print(worker_A_run.state.result())
    
    # now run other worker(s) that depend on A, now that its result is available
    
if __name__ == "__main__":
    orchestrator(
        worker_params={
            "message": "Hello from parent!",
        }
    )
note that I have
persist_result=True
(docs) on the child flow
🙌 1
message has been deleted
a
I didn't known that
from prefect.deployments import run_deployment
provided an alternative. I want the ability to run some deployments in parallel, so I think wrapping them in a task is the path forward? To get my desired effect, It looks like I would need to do a kind of topological graph traversal, at each stage running
kick_off_worker
, awaiting results and keeping them stored somewhere for subsequent calls to
kick_off_worker
. I was hoping there would be a way with just the prefect apis like
wait_for
etc. that I could sidestep having to do all of the control flow inside of the
orchestrator
flow. But seems that's not doable?
Or will the subflow concept capture that
wait_for
capability
and i can just pass the state object to the kick_off_worker task
n
I want the ability to run some deployments in parallel, so I think wrapping them in a task is the path forward?
that was my thought too, that’s why I wrapped
run_deployment
in a task, so you could you map if you’re going to run deployments in parallel that don’t have data deps (could be different deployments, you’d just have to pass
.map()
iterables of same length for
name
and
params
, or many runs of same deployment, where you’d pass
.map()
an
unmapped(name)
and iterable for
params
for each worker run)
and i can just pass the state object to the kick_off_worker task
if i'm understanding your point, I think yes that should work
going back to
I already have a DAG defined with how the deployments should be chained, and I also determine that some outputs of deployment runs should be used as input to other
run_deployment
invocations.
because you don't need to know the graph before runtime in prefect 2, this is how I imagine the purpose of the
orchestrator
flow in my example, it is the definition of which dependencies should exist between subflows / processes. and imo, the nice thing about defining it in the flow is that you can have conditional logic based on your subflow results to decide during runtime how to continue w downstream things
💯 1
a
Thank you!
n
sure thing!