MarcoM
12/11/2024, 4:28 PMMarvin
12/11/2024, 4:29 PMpython
from prefect import task, flow
from prefect.states import Completed, Failed, State
@task
def run_uploader(uploader):
uploader.load()
@flow
async def single_asset_uploader(asset_id: str):
data_contract = get_data_contract(asset_id)
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
upload_tasks = run_uploader.map(uploaders)
# Check if any task failed
if any(task.state.is_failed() for task in upload_tasks):
return Failed(message="One or more tasks failed.")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
return Completed(message="All tasks completed successfully.")
This code assumes you're using Prefect 3.x. The task.state.is_failed()
is a way to explicitly check if any task has failed.
### Additional Resources:
- Prefect 3.x handles terminal state determination, and the flow's return value directly impacts whether it is considered Completed
, Failed
, or Crashed
. For more details, see the Prefect 3.x manage states documentation and Prefect 3.x upgrade guide.
- Prefect 2.x also allows a combination of states and the final state of the flow run depends on the resolution of these states. Relevant 2.x documentation includes concepts of flows and strategies during migration to Prefect 3.0.
If you're not running a specific Prefect version, these examples give a general idea of how you can manage states and ensure failure is properly handled in your workflows.MarcoM
12/11/2024, 4:32 PMMarcoM
12/11/2024, 4:32 PMMarvin
12/11/2024, 4:33 PMFailed
state if any task fails, you can check the states of the tasks directly and use the return_state
parameter to manage flow results. Here's how you can achieve that:
python
from prefect import task, flow
from prefect.engine.state import Completed, Failed
from prefect.tasks.controlflow import merge
@task
def run_uploader(uploader):
uploader.load()
@flow
def single_asset_uploader(asset_id: str):
data_contract = get_data_contract(asset_id)
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
# Run uploader tasks and collect their results
upload_tasks = [run_uploader.submit(uploader) for uploader in uploaders]
# Check if any task failed
task_states = [task.wait().state for task in upload_tasks]
if any(state.is_failed() for state in task_states):
return Failed(message="One or more tasks failed while uploading.")
create_ok_file(data_contract=data_contract)
return Completed(message="All tasks completed successfully.")
if __name__ == "__main__":
single_asset_uploader("your_asset_id")
### Key Points
1. State Management: We capture each task's state using .wait().state
and check if any task failed using state.is_failed()
.
2. Return State: The flow manually returns a Failed
state if any of the task states indicate a failure.
For more details about using states in Prefect 2.x, you can refer to the Prefect 2.x flow concepts documentation.
I hope this helps your flow run like a well-oiled machine... or at least as close as we can get in the thrilling world of asynchronous task management!MarcoM
12/11/2024, 4:34 PMMarvin
12/11/2024, 4:35 PMpython
from prefect import task, flow
@task(retries=3, retry_delay_seconds=10)
def run_uploader(uploader):
if isinstance(uploader, SomeFailureCondition): # Assuming you know the failure condition
raise Exception("Simulating a crash!")
uploader.load()
@flow
def single_asset_uploader(asset_id: str):
data_contract = get_data_contract(asset_id)
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
upload_tasks = [run_uploader.submit(uploader) for uploader in uploaders]
# Wait for tasks to complete and check states, but retries happen automatically
for task in upload_tasks:
task.wait() # Or handle the future object for more control if needed
if __name__ == "__main__":
single_asset_uploader("your_asset_id")
### Explanation:
- Retries Setup: We set up the task with retries=3
to allow up to three retries upon encountering an exception or crash-like behavior.
- Delay Between Retries: The retry_delay_seconds=10
sets a 10-second delay between each retry attempt.
### Additional Pointers:
- For tasks to be retried automatically on failure, ensure that exceptions are properly raised.
- If specific crash situations need different handling, consider using a custom retry_condition_fn
.
If more specific criteria are involved in identifying a task status as "crashed" or if you want to ensure retries only in certain failed scenarios, check out the relevant Prefect 2.x task concepts documentation for guidance on setting global retry configurations or using custom retry conditions.
And remember, even in the chaos of managing retries, there's always room for a bit of zen and order in your task scheduling!MarcoM
12/11/2024, 4:38 PMMarcoM
12/11/2024, 4:38 PMMarvin
12/11/2024, 4:38 PMpython
from prefect import task, flow
from prefect.engine.state import Crashed, Failed
from prefect.engine.signals import FAIL
def state_handler(obj, old_state, new_state):
# Check if the task has transitioned into a Crashed state
if isinstance(new_state, Crashed):
# Force the state to Failed to allow retrials
return Failed("Task transitioned from Crashed to Failed.")
@task(state_handlers=[state_handler], retries=3, retry_delay_seconds=10)
def run_uploader(uploader):
uploader.load()
@flow
def single_asset_uploader(asset_id: str):
data_contract = get_data_contract(asset_id)
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
upload_tasks = [run_uploader.submit(uploader) for uploader in uploaders]
for task in upload_tasks:
task.wait()
if __name__ == "__main__":
single_asset_uploader("your_asset_id")
### Explanation
- State Handler: The state_handler
function checks if a task enters a Crashed
state. If it does, it transitions to Failed
, which allows for retry attempts.
- Retries: The task retry logic is configured using the retries
parameter to allow retries when the task state is "failed".
### Resources
Refer to the Prefect 2.x state change hooks documentation for more details about managing state transitions.
By converting "crashed" states to "failed," you can leverage Prefect's built-in retry mechanism, ensuring tasks get the chance to execute successfully despite transient issues—much like getting up again after accidentally "crashing" into a metaphorical wall.MarcoM
12/11/2024, 4:44 PM@flow(flow_run_name=generate_flow_run_name, on_crashed=)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime = datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>(f"Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>(f"Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>(f"Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
Marvin
12/11/2024, 4:45 PMcrashed
subflow by marking it as failed
in Prefect 2.x, you can use a state change hook. This hook allows you to execute custom logic when your flow transitions to a crashed
state. By doing so, you can programmatically change its state to failed
.
Here's how you can implement this logic in your flow:
python
from prefect import flow
from prefect.engine.state import Crashed, Failed
from prefect import get_run_logger
async def custom_crash_handler(flow, flow_run, state):
# Check if the flow has transitioned into a Crashed state
if isinstance(state, Crashed):
logger = get_run_logger()
logger.warning("Flow run crashed, marking as failed.")
return Failed("Flow transitioned from Crashed to Failed.")
@flow(flow_run_name=generate_flow_run_name, on_crashed=custom_crash_handler)
async def single_asset_uploader(asset_id: str, schedule_id: int = 1, datetime=datetime.now()) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Load data contract for asset '{asset_id}'")
data_contract = get_data_contract(asset_id)
<http://logger.info|logger.info>(f"Remove file .ok asset '{asset_id}'")
await remove_ok_file(data_contract)
<http://logger.info|logger.info>("Import file from DataDock to DataGateway")
await data_dock_to_data_gateway(data_contract, schedule_id)
<http://logger.info|logger.info>("Upload asset to destinations")
uploaders = get_uploaders_from_data_contract(data_contract, datetime)
<http://logger.info|logger.info>("Run uploaders tasks")
upload_tasks = run_uploader.map(uploaders)
<http://logger.info|logger.info>(f"Creation file .ok asset '{asset_id}'")
await create_ok_file(wait_for=upload_tasks, data_contract=data_contract)
### Key Points:
- Custom Crash Handler: The custom_crash_handler
function checks the flow's state. If the state is Crashed
, it logs a warning and modifies the state to Failed
. This allows any retry logic configured for failures to kick in.
- on_crashed Parameter: This parameter sets the hook to be used when the flow crashes.
This approach ensures that if your subflow enters a crashed
state, it is converted to failed
, enabling any further logic tied to failed states, such as retries, to execute effectively. For more nuanced details on managing state transitions, check out the Prefect 2.x state change hooks documentation.