<@ULVA73B9P> , Subject: Issue with Error Handling ...
# ask-marvin
m
@Marvin , Subject: Issue with Error Handling in Subflows with Prefect Cloud 2.19.2 Problem Description: I am using Prefect Cloud 2.19.2 and have a global flow (assets_creator) that launches multiple subflows (single_file_creator) in parallel using asyncio.gather(). Each subflow, in turn, calls the create_file task using .map() with two different inputs. Code Structure: The global flow assets_creator starts multiple subflows concurrently using asyncio.gather(). Each subflow single_file_creator runs the create_file task in parallel using .map(). The create_file task intentionally fails when the filename is "mps-report_dettaglio_produzione.txt", returning Failed(). Issue Observed: I expected that if a task or subflow fails, it would not impact the other running subflows. However, when a task or subflow fails, the failure propagates to the global flow (assets_creator), causing it to stop execution and crash the remaining subflows. Question: How can I handle failures in a way that prevents a single task or subflow failure from stopping the entire global flow? Is there a recommended approach to launch parallel/concurrent sublofws and tasks? or configuration to ensure that other subflows continue running independently of failures? code: import os import asyncio import random import time from datetime import datetime from prefect import flow, task, get_run_logger from prefect.states import Failed from prefect.runtime import flow_run, task_run def generate_flow_run_name(): return flow_run.parameters["asset_id"] def generate_task_name(): return task_run.parameters["filename"] @task(task_run_name=generate_task_name) def create_file(directory: str, filename: str, content: str): """ Crea un file nella directory specificata con il contenuto dato. Introduce un errore intenzionale su uno dei file. """ logger = get_run_logger() if filename == "mps-report_dettaglio_produzione.txt": time.sleep(15) return Failed() else: sleep_time = random.randint(10, 30) # Numero casuale tra 30 e 60 secondi time.sleep(sleep_time) os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste filepath = os.path.join(directory, filename) with open(filepath, "w") as file: file.write(content) logger.info(f"File created: {filepath}") @flow(flow_run_name=generate_flow_run_name) async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()): """ Subflow che gestisce la creazione dei file per un singolo asset_id. """ logger = get_run_logger() logger.info(f"Starting single_file_creator for asset '{asset_id}'.") directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S")) filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"] contents = [ f"Asset: {asset_id}, Time: {upload_time}", f"Asset: {asset_id}, Info: 1.o", ] create_file.map( [directory] * len(filenames), filenames, contents ) logger.info(f"Completed single_file_creator for asset '{asset_id}'.") @flow(flow_run_name="global") #, retries=3, retry_delay_seconds=120 async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None: subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids] await asyncio.gather(*subflows) if name == "__main__": asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao", "mps-saluti", "mps-mazzini", "mps-pochettini"] asyncio.run(assets_creator(asset_ids=asset_ids))
m
Your message was too long by 389 tokens...