Tri
11/01/2024, 9:26 PMon_cancellation
handler? I could cancel the flow in the UI, but the handler doesn't run
def on_cancellation(flow, run, state):
logger = get_run_logger()
<http://logger.info|logger.info>("cancellation handler running")
@task
def dummy_task():
while True:
sleep(1)
print("sleeping...")
@flow(on_cancellation=[on_cancellation], log_prints=True)
def test_console_flow():
dummy_task()
Marvin
11/01/2024, 9:26 PMon_cancellation
handler in a Prefect flow, I need to confirm the version of Prefect you are using, as syntax and features may differ between versions. Could you please confirm or let me know if you are using Prefect version 2.x or 3.x?Tri
11/01/2024, 9:26 PMMarvin
11/01/2024, 9:27 PMon_cancellation
handlers in Prefect 3.x. However, based on my knowledge, I can offer some advice:
In Prefect 3.x, cancellation handlers need to be set up alongside the flow's orchestration options. Here's how you can correctly set up an on_cancellation
handler:
1. Define a Cancellation Handler Function: This function will execute when cancellation is requested.
2. Flow Annotation: Make sure you correctly use the flow
annotation with the on_cancellation
argument to link to your handler.
Here's a refined way to set this up:
python
from prefect import flow, task, get_run_logger
from prefect.states import Cancelled
from time import sleep
def on_cancellation(flow, parameters, flow_run_id, **kwargs):
logger = get_run_logger()
<http://logger.info|logger.info>("Cancellation handler is running")
@task
def dummy_task():
while True:
sleep(1)
print("sleeping...")
@flow(on_cancellation=[on_cancellation])
def test_console_flow():
dummy_task()
if __name__ == '__main__':
test_console_flow()
Key Points:
- Make sure your infrastructure supports cancellation. Some infrastructures might not handle cancellation promptly or effectively.
- Ensure that the prefect
package is upgraded to the latest version, as some features and bug fixes might influence functionality.
Unfortunately, since the documentation link didn't provide specific details, exploring anomaly or issue tracking in GitHub, if the above doesn't work, could be informative. If you face issues consistently, looking into recent issues on GitHub related to this feature would be beneficial.
For further realism, you can check Prefect's official documentation or GitHub issues related to new or modified cancellation features in version 3.x at:
- Documentation link: Prefect 3.x documentation
- GitHub issues: Prefect GitHub IssuesTri
11/01/2024, 9:31 PMMarvin
11/01/2024, 9:31 PMasync
function can be beneficial. Prefect generally supports async function definitions for tasks and other hooks as it allows Prefect to more efficiently manage flow execution and event handling, although it does not strictly require them for cancellation handlers if there are no async operations being performed within them.
If you need to make it async, you would define it as follows:
python
async def on_cancellation(flow, parameters, flow_run_id, **kwargs):
logger = get_run_logger()
await <http://logger.info|logger.info>("Async cancellation handler is running")
Ultimately, whether it must be async depends on the use case. If you're executing blocking code, it's not strictly necessary. For further reading, check out the Prefect's official documentation on cancellation handlers or browse operational guides.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by