https://prefect.io logo
Title
m

Marius Vollmer

05/12/2023, 12:37 PM
Hi everyone, we are running Prefect (2.10.7) in AKS using the orchestrator pattern. A subflow failed, but it does not stop recreating itself over and over again. Even canceling the current subflow instance via CLI does not help. It only stops when I completely delete the corresponding deployment. We haven't configured retries so far (neither within the flow code nor via env vars), so I am wondering what's the reason for this behaviour? Thanks in advance!
j

Jeff Hale

05/12/2023, 3:33 PM
What does
prefect version
show?
z

Zanie

05/12/2023, 3:35 PM
A subflow you are running via
run_deployment
?
m

Marius Vollmer

05/12/2023, 5:08 PM
@Jeff Hale, on my local machine? It shows Version: 2.8.7 API version: 0.8.4 Python version: 3.9.13 Git commit: a6d6c6fc Built: Thu, Mar 23, 2023 3:27 PM OS/Arch: win32/AMD64 Profile: profileabc Server type: server so thanks for reminding me to update this as well to match the server version 🙂 Just did it. @Zanie, yes, we are running them with
run_deployment
, e.g.
deployed_run = run_deployment(
                    name="c35416ba-60dc-4394-b30f-06fb24a3173e",
                    flow_run_name=f"nicc_ecom_orders_consignment_header_worker_flow_dl for batch_id {batch_id}"
                )
z

Zanie

05/12/2023, 6:36 PM
Can you use this script to get the state transitions for the subflow run and its parent run?
from prefect import get_client

async def main(flow_run_id):
    async with get_client() as client:
        states = await client.read_flow_run_states(flow_run_id)
        for state in states:
            print(state.timestamp, state.type.name, state.name)


import asyncio
import sys

asyncio.run(main(sys.argv[1]))
m

Marius Vollmer

05/13/2023, 11:33 AM
Sure, please find below
2023-05-13T11:24:48.370183+00:00 SCHEDULED Scheduled
2023-05-13T11:24:53.936142+00:00 SCHEDULED Late 
2023-05-13T11:24:54.602460+00:00 PENDING Pending
2023-05-13T11:25:43.581055+00:00 RUNNING Running
2023-05-13T11:27:31.580192+00:00 FAILED Failed
Same for the following ones (except late, was caused by another issue). Sorry, one more piece of info I should have shared at the beginning: A loop is used to process batches sequentially. So the run_deployment is inside a loop (snippet below). Can this be the reason? My assumption so far was that a parent flow failes as soon as a subflow failed.
while (loop_for_next_batch.next_batch_available(stg_model, dl_database, dl_table_name)):
            batch_id = loop_for_next_batch.next_batch_available(stg_model, dl_database, dl_table_name)
            <http://logger.info|logger.info>('+++++++++++++++++++++++++++++++++++ Next batch: %s' % batch_id + '+++++++++++++++++++++++++++++++++++')
            deployed_run = run_deployment(
                    name="1a5a9bc2-9b9c-46db-8c88-0b34b9d92f72",
                    flow_run_name=f"nicc_ecom_orders_order_position_worker_flow_dl for batch_id {batch_id}",
                )
My target behaviour would be to exit the loop in case of a failed run and also to mark the parent flow as failed.
Hi Zanie, sorry to bother you again with this. I assume it's because of the loop construct, right? Can you give me a hint here how I read out the state after each loop and then stop the parent flow if necessary? Don't have a background in software development, so I'm trying my luck here once again 🙂
z

Zanie

05/16/2023, 3:08 PM
deployed_run = run_deployment(....)
if deployed_run.state.is_failed():
   # abort !
Like that?