@Marvin , Subject: Issue with Error Handling in Subflows with Prefect Cloud 2.19.2
Problem Description:
I am using Prefect Cloud 2.19.2 and have a global flow (assets_creator) that launches multiple subflows (single_file_creator) in parallel using asyncio.gather().
Each subflow, in turn, calls the create_file task using .map() with two different inputs.
Code Structure:
The global flow assets_creator starts multiple subflows concurrently using asyncio.gather().
Each subflow single_file_creator runs the create_file task in parallel using .map().
The create_file task intentionally fails when the filename is "mps-report_dettaglio_produzione.txt", returning Failed().
Issue Observed:
I expected that if a task or subflow fails, it would not impact the other running subflows.
However, when a task or subflow fails, the failure propagates to the global flow (assets_creator), causing it to stop execution and crash the remaining subflows.
Question:
How can I handle failures in a way that prevents a single task or subflow failure from stopping the entire global flow?
Is there a recommended approach to launch parallel/concurrent sublofws and tasks? or configuration to ensure that other subflows continue running independently of failures?
code:
import os
import asyncio
import random
import time
from datetime import datetime
from prefect import flow, task, get_run_logger
from prefect.states import Failed
from prefect.runtime import flow_run, task_run
def generate_flow_run_name():
return flow_run.parameters["asset_id"]
def generate_task_name():
return task_run.parameters["filename"]
@task(task_run_name=generate_task_name)
def create_file(directory: str, filename: str, content: str):
"""
Crea un file nella directory specificata con il contenuto dato.
Introduce un errore intenzionale su uno dei file.
"""
logger = get_run_logger()
if filename == "mps-report_dettaglio_produzione.txt":
time.sleep(15)
return Failed()
else:
sleep_time = random.randint(10, 30) # Numero casuale tra 30 e 60 secondi
time.sleep(sleep_time)
os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste
filepath = os.path.join(directory, filename)
with open(filepath, "w") as file:
file.write(content)
logger.info(f"File created: {filepath}")
@flow(flow_run_name=generate_flow_run_name)
async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()):
"""
Subflow che gestisce la creazione dei file per un singolo asset_id.
"""
logger = get_run_logger()
logger.info(f"Starting single_file_creator for asset '{asset_id}'.")
directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S"))
filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"]
contents = [
f"Asset: {asset_id}, Time: {upload_time}",
f"Asset: {asset_id}, Info: 1.o",
]
create_file.map(
[directory] * len(filenames),
filenames,
contents
)
logger.info(f"Completed single_file_creator for asset '{asset_id}'.")
@flow(flow_run_name="global") #, retries=3, retry_delay_seconds=120
async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None:
subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids]
await asyncio.gather(*subflows)
if
name == "__main__":
asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao", "mps-saluti", "mps-mazzini",
"mps-pochettini"]
asyncio.run(assets_creator(asset_ids=asset_ids))