Marius Vollmer
05/12/2023, 12:37 PMJeff Hale
05/12/2023, 3:33 PMprefect version
show?Zanie
05/12/2023, 3:35 PMrun_deployment
?Marius Vollmer
05/12/2023, 5:08 PMrun_deployment
, e.g.
deployed_run = run_deployment(
name="c35416ba-60dc-4394-b30f-06fb24a3173e",
flow_run_name=f"nicc_ecom_orders_consignment_header_worker_flow_dl for batch_id {batch_id}"
)
Zanie
05/12/2023, 6:36 PMfrom prefect import get_client
async def main(flow_run_id):
async with get_client() as client:
states = await client.read_flow_run_states(flow_run_id)
for state in states:
print(state.timestamp, state.type.name, state.name)
import asyncio
import sys
asyncio.run(main(sys.argv[1]))
Marius Vollmer
05/13/2023, 11:33 AM2023-05-13T11:24:48.370183+00:00 SCHEDULED Scheduled
2023-05-13T11:24:53.936142+00:00 SCHEDULED Late
2023-05-13T11:24:54.602460+00:00 PENDING Pending
2023-05-13T11:25:43.581055+00:00 RUNNING Running
2023-05-13T11:27:31.580192+00:00 FAILED Failed
Same for the following ones (except late, was caused by another issue).
Sorry, one more piece of info I should have shared at the beginning: A loop is used to process batches sequentially. So the run_deployment is inside a loop (snippet below). Can this be the reason? My assumption so far was that a parent flow failes as soon as a subflow failed.
while (loop_for_next_batch.next_batch_available(stg_model, dl_database, dl_table_name)):
batch_id = loop_for_next_batch.next_batch_available(stg_model, dl_database, dl_table_name)
<http://logger.info|logger.info>('+++++++++++++++++++++++++++++++++++ Next batch: %s' % batch_id + '+++++++++++++++++++++++++++++++++++')
deployed_run = run_deployment(
name="1a5a9bc2-9b9c-46db-8c88-0b34b9d92f72",
flow_run_name=f"nicc_ecom_orders_order_position_worker_flow_dl for batch_id {batch_id}",
)
My target behaviour would be to exit the loop in case of a failed run and also to mark the parent flow as failed.Zanie
05/16/2023, 3:08 PMdeployed_run = run_deployment(....)
if deployed_run.state.is_failed():
# abort !