MarcoM
02/27/2025, 2:50 PMMarvin
02/27/2025, 2:50 PMtry/except
blocks within your subflows to catch exceptions and handle them gracefully. This way, you can log the error or return a specific state without affecting the parent flow.
2. Use of allow_failure
: Prefect provides the allow_failure
decorator that can be applied to tasks to prevent failures from propagating. This can be useful if you want certain tasks to fail silently.
3. State Management: By setting raise_on_failure=False
and checking task states explicitly, you can decide how to handle each task's outcome. This allows you to continue the flow even if some tasks fail.
4. Concurrency Management: When using asyncio.gather()
, you can set return_exceptions=True
to gather results from all coroutines, even if some raise exceptions. This prevents the entire operation from stopping due to a single failure.
For more details, you can refer to the Prefect documentation on error handling and state management.
Given your preference for an imperative programming style and integration with PyO3, you might find it beneficial to handle exceptions explicitly within your code logic to maintain control over flow execution.
If you need further guidance, you can explore the Prefect API for functions like allow_failure
or suspend_flow_run
to manage flow states effectively.
If you have any more specific scenarios or need code examples, feel free to ask!MarcoM
02/27/2025, 2:52 PMMarcoM
02/27/2025, 2:53 PMMarvin
02/27/2025, 2:53 PMMarcoM
02/27/2025, 2:55 PMMarvin
02/27/2025, 2:55 PMMarcoM
02/27/2025, 2:56 PMMarvin
02/27/2025, 2:57 PMsingle_file_creator
flow. Here's how you can adjust it:
1. Wrap the Task Execution in a try/except
Block: This will ensure that any exceptions raised by the create_file
task are caught and handled, allowing other subflows to continue execution.
2. Use return_exceptions=True
in asyncio.gather()
: This will allow asyncio.gather()
to collect exceptions as part of its results, rather than stopping execution when an exception occurs.
Here's how you can modify your single_file_creator
and assets_creator
flows:
python
@flow(flow_run_name=generate_flow_run_name)
async def single_file_creator(asset_id: str, upload_time: datetime = datetime.now()):
logger = get_run_logger()
directory = os.path.join("C:\\temp\\test_retry", upload_time.strftime("%Y-%m-%d_%H-%M-%S"))
filenames = [f"{asset_id}_produzione.txt", f"{asset_id}_dataeng.txt"]
contents = [
f"Asset: {asset_id}, Time: {upload_time}",
f"Asset: {asset_id}, Info: 1.o",
]
try:
create_file.map(
[directory] * len(filenames),
filenames,
contents
)
except Exception as e:
logger.error(f"Error in subflow for asset_id {asset_id}: {e}")
return Failed()
@flow(flow_run_name="global")
async def assets_creator(asset_ids: list[str], upload_time: datetime = datetime.now()) -> None:
subflows = [single_file_creator(asset_id, upload_time) for asset_id in asset_ids]
await asyncio.gather(*subflows, return_exceptions=True)
if __name__ == "__main__":
asset_ids = ["mps-report_sintetico", "mps-report_dettaglio", "mps-report_ciao"]
asyncio.run(assets_creator(asset_ids=asset_ids))
Key Changes:
- Added a try/except
block in single_file_creator
to handle exceptions.
- Used return_exceptions=True
in asyncio.gather()
to collect exceptions without stopping the execution of other subflows.
This should help in preventing a failure in one subflow from affecting the execution of others. Let me know if you need further assistance!MarcoM
02/27/2025, 2:59 PM@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>(f"Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>(f"Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Marvin
02/27/2025, 2:59 PM.map()
method, you can wrap the execution of the run_uploader
task in a try/except
block. However, since .map()
is used for parallel task execution, you'll need to handle exceptions within the task itself. Here's how you can modify the run_uploader
task to include error handling:
1. Modify the run_uploader
Task: Add a try/except
block inside the task to catch and log exceptions.
2. Log Errors: Use the logger to log any exceptions that occur during the task execution.
Here's an example of how you can achieve this:
python
from prefect import task
@task
def run_uploader(uploader):
logger = get_run_logger()
try:
# Your uploader logic here
pass # Replace with actual logic
except Exception as e:
logger.error(f"Error in uploader task: {e}")
raise # Optionally re-raise the exception if you want to fail the task
@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>(f"Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>(f"Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Key Points:
- The run_uploader
task now includes error handling, logging any exceptions that occur.
- The raise
statement within the except
block is optional. If you want the task to fail and propagate the failure, you can keep it; otherwise, you can remove it to allow the task to complete with a logged error.
This approach ensures that each uploader task handles its own errors, allowing the flow to continue executing other tasks.MarcoM
02/27/2025, 3:02 PM@task(task_run_name=generate_task_name)
def run_uploader(uploader: Uploader):
uploader.load()
@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>(f"Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>(f"Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Marvin
02/27/2025, 3:02 PMrun_uploader
task, you can modify the task to include a try/except
block. This will allow each uploader to handle its own errors without affecting the others. Here's how you can adjust the code:
python
from prefect import flow, task, get_run_logger
@task(task_run_name=generate_task_name)
def run_uploader(uploader: Uploader):
logger = get_run_logger()
try:
uploader.load()
except Exception as e:
logger.error(f"Error in uploader task for {uploader}: {e}")
# Optionally, you can return a failed state or custom message
return Failed(message=f"Uploader failed: {e}")
@flow(flow_run_name=generate_flow_run_name)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>(f"Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>(f"Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Key Changes:
- Added a try/except
block inside the run_uploader
task to catch and log any exceptions that occur during the uploader.load()
execution.
- Logged the error with a descriptive message to help identify which uploader encountered an issue.
- Optionally, you can decide to return a Failed
state or a custom message if you want to handle the failure explicitly in the flow logic.
This approach ensures that each uploader task can fail independently without halting the execution of other tasks or the overall flow.