Stephen Lloyd
03/29/2023, 8:18 AM@task
def run_single_flow(deployment_name: str):
return run_deployment(name=deployment_name)
@flow
def orchestrate_flows():
logger = get_run_logger()
flow_name = "scheduler"
<http://logger.info|logger.info>(contexts.build_flow_run_url())
deployments = [
"my-flow/some_other_flow",
"another-flow/some_flow",
"final-flow/some_final_flow",
]
futures = []
for d in deployments:
run = run_single_flow.submit(d)
#figure out how to get the flow_run_id her
<http://logger.info|logger.info>(run.something.flow_run_id)
futures.append(run)
waiting = [f.wait() for f in futures]
for x in waiting:
<http://logger.info|logger.info>(x.result().id)
I can get this if I wait()
for the result, but that doesn't help me as much.Dominic Tarro
03/29/2023, 1:05 PMStephen Lloyd
03/30/2023, 4:09 AMrun_deployment
function both creates and runs a flow run.
If you have a moment, could you flesh out what you mean a little more?run_deployment
do not show up in the calling flow.
⢠Creating and logging a url-string to a flow run created by run_deployment()
⦠I cannot seem to find the flow_run_id
prior to flow run reach a completed sate
⦠This seems like an opportunity for improvement!
⢠Just hoping I will figure it outEmil Christensen
03/30/2023, 5:34 PMrun_deployment
you can pass timeout=0
which will immediately return the flow_run
where you can get the ID with flow_run.id
.Stephen Lloyd
04/06/2023, 3:19 AMtimeout=0
the State no longer updates. my_flow_run.wait()
does not seem to behave as intended. The flow seems to think the wait is over. š
Is this to be expected?