<@ULVA73B9P> can I call a callback when a task run...
# ask-community
g
@Marvin can I call a callback when a task run is cancelled using the endpoint
api/task_runs/{id}/set_state
?
m
thought for 144.6 seconds
Based on my research, I need to clarify something: While you can use the API endpoint
api/task_runs/{id}/set_state
to cancel a task run, there isn't a direct callback mechanism specifically for cancellation events through the API endpoint. However, there are a few alternatives you might consider: 1. State Change Hooks: You can set up state change handlers in your flow code using the
on_completion
,
on_failure
, or
on_crashed
decorators. However, these don't specifically handle cancellation events. 2. Automation: You could set up an automation in Prefect that monitors for task state changes to "Cancelled" and triggers your desired callback logic. 3. Custom Monitoring: You could implement your own monitoring solution that watches for state changes via the Prefect API and triggers appropriate callbacks in your application. If you need to respond to cancellation events specifically, could you tell me more about your use case? This would help me suggest the most appropriate solution for your needs. For example: - Are you trying to clean up resources when a task is cancelled? - Do you need real-time notification of cancellation? - Are you working with the task runs programmatically or through the UI?
g
@Marvin can I call a callback when a task run is cancelled using the endpoint
api/task_runs/{id}/set_state
? I need to clean some resources programmatically when a task run is cancelled through the endpoint. I need to do the same when a flow run is cancelled through the endpoint
api/flow_runs/{id}/set_state
. The only way I found is to trigger clean up dag after I called the set_state endpoint
m
thought for 43.7 seconds
Based on my research, your current approach of triggering a cleanup flow after calling the set_state endpoint is actually the most reliable solution. Here's why: 1. No Built-in Hooks: There are no built-in hooks or callbacks that automatically trigger when a flow or task run is cancelled via the API endpoints. 2. API Endpoint Limitations: The
set_state
endpoints are primarily designed for state management, not for triggering side effects or callbacks. 3. Best Practices for Your Scenario: - Your approach of triggering a cleanup flow is a good pattern because: - It's explicit and traceable - It ensures cleanup happens even if the cancellation is triggered via API - It keeps the cleanup logic separate and reusable Here's a suggested pattern to formalize your approach:
Copy code
python
from prefect import flow
from prefect.client import get_client

async def cancel_and_cleanup(flow_run_id: str):
    async with get_client() as client:
        # First, cancel the flow run
        await client.set_flow_run_state(
            flow_run_id=flow_run_id,
            state={"type": "CANCELLED"}
        )
        
        # Then trigger your cleanup flow
        await cleanup_flow(flow_run_id=flow_run_id)

@flow
async def cleanup_flow(flow_run_id: str):
    # Your cleanup logic here
    ...
Alternative Approaches You Could Consider: 1. Structured Concurrency: If you're working with async code, you could use AnyIO's cancellation scopes within your flows to handle cleanup when possible. However, this won't help with API-triggered cancellations. 2. State Change Automations: You could create a Prefect automation that triggers your cleanup flow whenever a flow or task enters a "Cancelled" state. This would be more automatic but might introduce some latency. 3. Monitoring Service: You could implement a separate service that monitors flow/task states and triggers cleanup when needed. This would be more complex but could provide more control. Given your specific need to handle API-triggered cancellations, your current approach of explicitly triggering cleanup after calling the set_state endpoint is actually a solid solution. It's clear, maintainable, and ensures that cleanup happens reliably. Would you like me to elaborate on any of these approaches or provide more specific implementation details for your use case?
g
@Marvin I have a flow run which has task runs A, B, C and D. D is started after allow_failure(C), C is started after B and B is started after A. If during the execution of A or B I cancel all task runs except D using the endpoint a`pi/task_runs/{id}/set_state`, would D be executed?
m
thought for 96.4 seconds
Based on my research, here's how this scenario would work: 1. If you cancel tasks A, B, and C using the API endpoint while A or B is running: - Task D should still execute because: a) It's downstream of
allow_failure(C)
, which means it's designed to run regardless of C's state b) API-triggered cancellation of specific task runs doesn't automatically cancel downstream tasks However, there are some important caveats: 1. Flow Run State: Make sure you're only cancelling the specific task runs and not the entire flow run. If you cancel the flow run, all tasks including D will be cancelled. 2. Timing Matters: - If you cancel A while it's running, B won't start (as it depends on A) - If you cancel B while it's running, C won't start - But in either case, D should still execute after
allow_failure(C)
Here's a code example to illustrate this behavior:
Copy code
python
from prefect import flow, task, allow_failure

@task
async def task_a():
    return "A"

@task
async def task_b():
    return "B"

@task
async def task_c():
    return "C"

@task
async def task_d():
    return "D"

@flow
async def my_flow():
    a = await task_a()
    b = await task_b(wait_for=[a])
    c = await task_c(wait_for=[b])
    # D will run even if C is cancelled because of allow_failure
    d = await task_d(wait_for=[allow_failure(c)])
To verify this behavior in your specific case, I recommend: 1. Running a test flow with this pattern 2. Using the API to cancel tasks A, B, and C during execution 3. Monitoring the state transitions in the UI or via the API Would you like me to provide more details about any aspect of this behavior or help you set up a test case to verify it?
g
@Marvin would D still run even if C does not start because B has been cancelled?
m
thought for 64.1 seconds
Based on my research, I need to correct my previous answer: No, task D would not run if task C never starts due to B being cancelled. Here's why: 1.
allow_failure(C)
only handles cases where task C actually runs and fails or is cancelled during its execution. 2. If task C never starts (because B was cancelled), then the dependency chain is broken before
allow_failure
even comes into play. The sequence would be: 1. Task B is cancelled 2. Because B is cancelled, C never starts (it remains in a "Pending" state) 3. Since C never starts, D won't run, even with
allow_failure(C)
allow_failure
is designed to handle failures and cancellations of tasks that actually start executing, not to bypass the entire dependency chain. If you need D to run regardless of whether C starts or not, you would need to restructure your flow to remove the dependency chain or implement a different pattern. If you need D to run regardless of what happens to A, B, or C, you might want to consider one of these alternatives: 1. Make D completely independent:
Copy code
python
@flow
async def my_flow():
    a = await task_a()
    b = await task_b(wait_for=[a])
    c = await task_c(wait_for=[b])
    # D runs independently
    d = await task_d()
2. Use parallel execution if D should start at the same time as A:
Copy code
python
@flow
async def my_flow():
    # Start both paths simultaneously
    results = await anyio.create_task_group(
        task_a(),  # This starts the A -> B -> C chain
        task_d()   # This runs independently
    )
3. If you need some form of dependency but want D to run even if previous tasks don't start, you might need to implement custom state handling or use a different flow structure entirely. Would you like me to elaborate on any of these alternatives or help you design a flow structure that better meets your needs?