<@ULVA73B9P>: Subject: Issue with Error Handling i...
# ask-marvin
m
@Marvin: Subject: Issue with Error Handling in Subflows with Prefect Cloud 2.19.2 Problem Description: I am using Prefect Cloud 2.19.2 and have a global flow (assets_creator) that launches multiple subflows (single_file_creator) in parallel using asyncio.gather(). Each subflow, in turn, calls the create_file task using .map() with two different inputs. Code Structure: The global flow assets_creator starts multiple subflows concurrently using asyncio.gather(). Each subflow single_file_creator runs the create_file task in parallel using .map(). The create_file task intentionally fails when the filename is "mps-report_dettaglio_produzione.txt", returning Failed(). Issue Observed: I expected that if a task or subflow fails, it would not impact the other running subflows. However, when a task or subflow fails, the failure propagates to the global flow (assets_creator), causing it to stop execution and crash the remaining subflows. Question: How can I handle failures in a way that prevents a single task or subflow failure from stopping the entire global flow? Is there a recommended approach to launch parallel/concurrent sublofws and tasks? or configuration to ensure that other subflows continue running independently of failures?
m
To handle failures in subflows without impacting the global flow in Prefect, you can utilize a few strategies: 1. Error Handling in Subflows: Prefect allows you to manage task and subflow failures without propagating them to the global flow. You can use
try/except
blocks within your subflows to catch exceptions and handle them gracefully. This way, you can log the error or return a specific state without affecting the parent flow. 2. Use of
allow_failure
: Prefect provides the
allow_failure
decorator that can be applied to tasks to prevent failures from propagating. This can be useful if you want certain tasks to fail silently. 3. State Management: By setting
raise_on_failure=False
and checking task states explicitly, you can decide how to handle each task's outcome. This allows you to continue the flow even if some tasks fail. 4. Concurrency Management: When using
asyncio.gather()
, you can set
return_exceptions=True
to gather results from all coroutines, even if some raise exceptions. This prevents the entire operation from stopping due to a single failure. For more details, you can refer to the Prefect documentation on error handling and state management. Given your preference for an imperative programming style and integration with PyO3, you might find it beneficial to handle exceptions explicitly within your code logic to maintain control over flow execution. If you need further guidance, you can explore the Prefect API for functions like
allow_failure
or
suspend_flow_run
to manage flow states effectively. If you have any more specific scenarios or need code examples, feel free to ask!
m
this is the code: import os import asyncio import random import time from datetime import datetime from prefect import flow, task, get_run_logger from prefect.states import Failed from prefect.runtime import flow_run, task_run def generate_flow_run_name(): return flow_run.parameters["asset_id"] def generate_task_name(): return task_run.parameters["filename"] @task(task_run_name=generate_task_name) def create_file(directory: str, filename: str, content: str): """ Crea un file nella directory specificata con il contenuto dato. Introduce un errore intenzionale su uno dei file. """ logger = get_run_logger() if filename == "mps-report_dettaglio_produzione.txt": time.sleep(15) return Failed() else: sleep_time = random.randint(10, 30) # Numero casuale tra 30 e 60 secondi time.sleep(sleep_time) os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste filepath = os.path.join(directory, filename) with open(filepath, "w") as file: file.write(content) logger.info(f"File created: {filepath}") @flow(flow_run_name=generate_flow_run_name) async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()): """ Subflow che gestisce la creazione dei file per un singolo asset_id. """ logger = get_run_logger() logger.info(f"Starting single_file_creator for asset '{asset_id}'.") directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S")) filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"] contents = [ f"Asset: {asset_id}, Time: {upload_time}", f"Asset: {asset_id}, Info: 1.o", ] create_file.map( [directory] * len(filenames), filenames, contents ) logger.info(f"Completed single_file_creator for asset '{asset_id}'.") @flow(flow_run_name="global") #, retries=3, retry_delay_seconds=120 async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None: subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids] await asyncio.gather(*subflows) if name == "__main__": asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao", "mps-saluti", "mps-mazzini", "mps-pochettini"] asyncio.run(assets_creator(asset_ids=asset_ids)) --> how do you solve the problem described in the previous question?
@Marvin this is the code: import os import asyncio import random import time from datetime import datetime from prefect import flow, task, get_run_logger from prefect.states import Failed from prefect.runtime import flow_run, task_run def generate_flow_run_name(): return flow_run.parameters["asset_id"] def generate_task_name(): return task_run.parameters["filename"] @task(task_run_name=generate_task_name) def create_file(directory: str, filename: str, content: str): """ Crea un file nella directory specificata con il contenuto dato. Introduce un errore intenzionale su uno dei file. """ logger = get_run_logger() if filename == "mps-report_dettaglio_produzione.txt": time.sleep(15) return Failed() else: sleep_time = random.randint(10, 30) # Numero casuale tra 30 e 60 secondi time.sleep(sleep_time) os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste filepath = os.path.join(directory, filename) with open(filepath, "w") as file: file.write(content) logger.info(f"File created: {filepath}") @flow(flow_run_name=generate_flow_run_name) async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()): """ Subflow che gestisce la creazione dei file per un singolo asset_id. """ logger = get_run_logger() logger.info(f"Starting single_file_creator for asset '{asset_id}'.") directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S")) filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"] contents = [ f"Asset: {asset_id}, Time: {upload_time}", f"Asset: {asset_id}, Info: 1.o", ] create_file.map( [directory] * len(filenames), filenames, contents ) logger.info(f"Completed single_file_creator for asset '{asset_id}'.") @flow(flow_run_name="global") #, retries=3, retry_delay_seconds=120 async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None: subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids] await asyncio.gather(*subflows) if name == "__main__": asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao", "mps-saluti", "mps-mazzini", "mps-pochettini"] asyncio.run(assets_creator(asset_ids=asset_ids)) --> how do you solve the problem described in the previous question?
m
Your message was too long by 150 tokens...
m
@Marvin this is my code --> import os import asyncio import random import time from datetime import datetime from prefect import flow, task, get_run_logger from prefect.states import Failed from prefect.runtime import flow_run, task_run def generate_flow_run_name(): return flow_run.parameters["asset_id"] def generate_task_name(): return task_run.parameters["filename"] @task(task_run_name=generate_task_name) def create_file(directory: str, filename: str, content: str): logger = get_run_logger() os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste filepath = os.path.join(directory, filename) with open(filepath, "w") as file: file.write(content) logger.info(f"File created: {filepath}") @flow(flow_run_name=generate_flow_run_name) async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()): logger = get_run_logger() logger.info(f"Starting single_file_creator for asset '{asset_id}'.") directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S")) filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"] contents = [ f"Asset: {asset_id}, Time: {upload_time}", f"Asset: {asset_id}, Info: 1.o", ] create_file.map( [directory] * len(filenames), filenames, contents ) logger.info(f"Completed single_file_creator for asset '{asset_id}'.") @flow(flow_run_name="global") async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None: subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids] await asyncio.gather(*subflows) if name == "_main_": asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao", "mps-saluti", "mps-mazzini", "mps-pochettini"] asyncio.run(assets_creator(asset_ids=asset_ids)) ---> how do you solve the problme described in the rpevius question in the chat?
m
Your message was too long by 31 tokens...
m
@Marvin this is my code --> import os import asyncio import random import time from datetime import datetime from prefect import flow, task, get_run_logger from prefect.states import Failed from prefect.runtime import flow_run, task_run def generate_flow_run_name(): return flow_run.parameters["asset_id"] def generate_task_name(): return task_run.parameters["filename"] @task(task_run_name=generate_task_name) def create_file(directory: str, filename: str, content: str): os.makedirs(directory, exist_ok=True) # Crea la directory se non esiste filepath = os.path.join(directory, filename) with open(filepath, "w") as file: file.write(content) @flow(flow_run_name=generate_flow_run_name) async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()): directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S")) filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"] contents = [ f"Asset: {asset_id}, Time: {upload_time}", f"Asset: {asset_id}, Info: 1.o", ] create_file.map( [directory] * len(filenames), filenames, contents ) @flow(flow_run_name="global") async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None: subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids] await asyncio.gather(*subflows) if name == "main": asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao"] asyncio.run(assets_creator(asset_ids=asset_ids)) ---> how do you solve the problme described in the rpevius question in the chat?
m
Your code imports are correct. To address the issue of preventing a single task or subflow failure from stopping the entire global flow, you can modify your code to include error handling within the
single_file_creator
flow. Here's how you can adjust it: 1. Wrap the Task Execution in a
try/except
Block
: This will ensure that any exceptions raised by the
create_file
task are caught and handled, allowing other subflows to continue execution. 2. Use
return_exceptions=True
in
asyncio.gather()
: This will allow
asyncio.gather()
to collect exceptions as part of its results, rather than stopping execution when an exception occurs. Here's how you can modify your
single_file_creator
and
assets_creator
flows:
Copy code
python
@flow(flow_run_name=generate_flow_run_name)
async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()):
    logger = get_run_logger()
    directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S"))
    filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"]
    contents = [
        f"Asset: {asset_id}, Time: {upload_time}",
        f"Asset: {asset_id}, Info: 1.o",
    ]
    try:
        create_file.map(
            [directory] * len(filenames),
            filenames,
            contents
        )
    except Exception as e:
        logger.error(f"Error in subflow for asset_id {asset_id}: {e}")
        return Failed()

@flow(flow_run_name="global")
async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None:
    subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids]
    await asyncio.gather(*subflows, return_exceptions=True)

if __name__ == "__main__":
    asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao"]
    asyncio.run(assets_creator(asset_ids=asset_ids))
Key Changes: - Added a
try/except
block in
single_file_creator
to handle exceptions. - Used
return_exceptions=True
in
asyncio.gather()
to collect exceptions without stopping the execution of other subflows. This should help in preventing a failure in one subflow from affecting the execution of others. Let me know if you need further assistance!
m
@Marvin, qui come inseriresti il try/exception nel .map? -->
Copy code
@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
    logger = get_run_logger()

    <http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
    data_contract = get_data_contract(asset_id)

    <http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
    await remove_ok_file(data_contract)

    <http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
    await data_dock_to_data_gateway(data_contract, schedule_id)

    <http://logger.info|logger.info>(f"Upload asset to destinations")
    uploaders = get_uploaders_from_data_contract(data_contract, datetime)

    <http://logger.info|logger.info>(f"Run uploaders tasks")
    upload_tasks = run_uploader.map(uploaders)

    <http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
    await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
m
To incorporate error handling within the
.map()
method, you can wrap the execution of the
run_uploader
task in a
try/except
block. However, since
.map()
is used for parallel task execution, you'll need to handle exceptions within the task itself. Here's how you can modify the
run_uploader
task to include error handling: 1. Modify the
run_uploader
Task
: Add a
try/except
block inside the task to catch and log exceptions. 2. Log Errors: Use the logger to log any exceptions that occur during the task execution. Here's an example of how you can achieve this:
Copy code
python
from prefect import task

@task
def run_uploader(uploader):
    logger = get_run_logger()
    try:
        # Your uploader logic here
        pass  # Replace with actual logic
    except Exception as e:
        logger.error(f"Error in uploader task: {e}")
        raise  # Optionally re-raise the exception if you want to fail the task

@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
    logger = get_run_logger()

    <http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
    data_contract = get_data_contract(asset_id)

    <http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
    await remove_ok_file(data_contract)

    <http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
    await data_dock_to_data_gateway(data_contract, schedule_id)

    <http://logger.info|logger.info>(f"Upload asset to destinations")
    uploaders = get_uploaders_from_data_contract(data_contract, datetime)

    <http://logger.info|logger.info>(f"Run uploaders tasks")
    upload_tasks = run_uploader.map(uploaders)

    <http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
    await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Key Points: - The
run_uploader
task now includes error handling, logging any exceptions that occur. - The
raise
statement within the
except
block is optional. If you want the task to fail and propagate the failure, you can keep it; otherwise, you can remove it to allow the task to complete with a logged error. This approach ensures that each uploader task handles its own errors, allowing the flow to continue executing other tasks.
m
@Marvin this is the entire code -->
Copy code
@task(task_run_name=generate_task_name)
def run_uploader(uploader: Uploader):
    uploader.load()


@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
    logger = get_run_logger()

    <http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
    data_contract = get_data_contract(asset_id)

    <http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
    await remove_ok_file(data_contract)

    <http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
    await data_dock_to_data_gateway(data_contract, schedule_id)

    <http://logger.info|logger.info>(f"Upload asset to destinations")
    uploaders = get_uploaders_from_data_contract(data_contract, datetime)

    <http://logger.info|logger.info>(f"Run uploaders tasks")
    upload_tasks = run_uploader.map(uploaders)

    <http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
    await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
m
To incorporate error handling within the
run_uploader
task, you can modify the task to include a
try/except
block. This will allow each uploader to handle its own errors without affecting the others. Here's how you can adjust the code:
Copy code
python
from prefect import flow, task, get_run_logger

@task(task_run_name=generate_task_name)
def run_uploader(uploader: Uploader):
    logger = get_run_logger()
    try:
        uploader.load()
    except Exception as e:
        logger.error(f"Error in uploader task for {uploader}: {e}")
        # Optionally, you can return a failed state or custom message
        return Failed(message=f"Uploader failed: {e}")

@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
    logger = get_run_logger()

    <http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
    data_contract = get_data_contract(asset_id)

    <http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
    await remove_ok_file(data_contract)

    <http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
    await data_dock_to_data_gateway(data_contract, schedule_id)

    <http://logger.info|logger.info>(f"Upload asset to destinations")
    uploaders = get_uploaders_from_data_contract(data_contract, datetime)

    <http://logger.info|logger.info>(f"Run uploaders tasks")
    upload_tasks = run_uploader.map(uploaders)

    <http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
    await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Key Changes: - Added a
try/except
block inside the
run_uploader
task to catch and log any exceptions that occur during the
uploader.load()
execution. - Logged the error with a descriptive message to help identify which uploader encountered an issue. - Optionally, you can decide to return a
Failed
state or a custom message if you want to handle the failure explicitly in the flow logic. This approach ensures that each uploader task can fail independently without halting the execution of other tasks or the overall flow.