Aaron Goebel
01/11/2023, 1:40 AMcreate_flow_run_from_deployment
/ run_deployment
APIs to compose my deployments on http request. E.g. I get a payload defining a set of flows to chain together, that triggers a parent container flow, and that container flow does the orchestration of tying things together.
The way I'm thinking of running it, because there are dependencies, is to have the run_deployment
code wrapped in a task like this :
@task
async def run_deployment(depl_id: str, parameters: dict):
async with prefect.context(**prefect.context.run_params):
async with prefect.Client() as client:
run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
run_state = await run.get_state()
return run_state.result
and then in an orchestration flow I think I want to do something like this where I create these tasks for each deployment.
@flow
async def container_flow(flow_graph: dict):
results = {}
ordered_flows = order(flow_graph)
# create tasks
tasks = {name: run_deployment.map(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
for name, flow_params in flow_graph.items()}
# set dependencies
for flow_name, flow_params in flow_graph.items():
for dependency in flow_params.get('dependencies', []):
tasks[flow_name].set_upstream(tasks[dependency], flow=True)
# run tasks concurrently
flow_results = await prefect.engine.run(tasks, return_tasks=True)
# store results
for flow_name, flow_result in flow_results.items():
results[flow_name] = flow_result.result
return results
Aaron Goebel
01/11/2023, 1:46 AMDeplB
expects input parameter X
as the result of DeplA
. I don't want to change the flow code in DeplB
to optionally accept a State
parameter. I just want to be able to pull it down from inside the container_flow
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by