<@ULVA73B9P> when i want cancel flow, i need call ...
# ask-marvin
s
@Marvin when i want cancel flow, i need call custom method, how i can achieve that
m
To assist you with how to call a custom method when canceling a flow, I need to determine which version of Prefect you're using. I will search the Prefect 2.x and 3.x documentation to find the best solution for you. Please give me a moment. It seems I had a little hiccup there. Let me adjust and try again to find the relevant information for you. Hang tight! To achieve the behavior of running a custom method when canceling a flow in Prefect, it looks like you're on the right track. Here’s what you need to consider based on Prefect documentation: 1. Prefect 2.x: Unfortunately, detailed information specific to executing a custom method on flow cancellation is not directly available in the provided snippets. However, you can explore using Prefect's state handlers, which may allow you to define custom behavior on state changes, such as cancellation. For more about this, you might want to explore the Prefect 2.x documentation. 2. Prefect 3.x: Similar challenges with direct snippets. Yet, Prefect 3.x also supports state handlers and custom state transitions. You might consider setting up a state handler that triggers custom logic upon cancellation. Check out the Prefect 3.x documentation for further guidance. Given Prefect's structure, you’d typically register a callback or state handler function that runs your custom logic when the flow's state changes to canceled. While it might feel as slow as a depressed android pondering life, referring directly to your specific Prefect version's documentation is the best way to ensure compatibility and functionality. Feel free to ask for more specific examples or further clarification!
s
@Marvin for prefect3 can you create little snippet about cancellation
m
Certainly! Here's a simple example of how you might handle cancellation in a Prefect 3.x flow by using state handlers to execute a custom function whenever the flow is cancelled:
Copy code
python
from prefect import flow, task
from prefect.states import State

# Define a custom function to run on cancellation
def on_cancel_fn(flow_state: State):
    print("Oh no, the flow has been cancelled! Custom cleanup in progress...")

# Define a simple task
@task
def simple_task():
    print("Running task")

# Define the flow
@flow(on_cancelling=[on_cancel_fn])
def my_flow():
    simple_task()

# Run the flow
if __name__ == "__main__":
    my_flow()
In this snippet, the
on_cancel_fn
function prints a message when the flow is cancelled. The
@flow
decorator accepts an
on_cancelling
argument where you can pass in a list of functions to handle cancellation events, such as
on_cancel_fn
. When the flow is cancelled, this function is triggered. You can learn more about state handlers and managing flow states from the official Prefect 3.x documentation here. It's as if your flow feels the existential dread of cancellation, but at least it cleans up nicely!
s
@Marvin i dont need task and signature of @flow different:
Copy code
def flow(
    __fn=None,
    *,
    name: Optional[str] = None,
    version: Optional[str] = None,
    flow_run_name: Optional[Union[Callable[[], str], str]] = None,
    retries: Optional[int] = None,
    retry_delay_seconds: Union[int, float, None] = None,
    task_runner: Optional[TaskRunner] = None,
    description: Optional[str] = None,
    timeout_seconds: Union[int, float, None] = None,
    validate_parameters: bool = True,
    persist_result: Optional[bool] = None,
    result_storage: Optional[ResultStorage] = None,
    result_serializer: Optional[ResultSerializer] = None,
    cache_result_in_memory: bool = True,
    log_prints: Optional[bool] = None,
    on_completion: Optional[
        List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
    ] = None,
    on_failure: Optional[
        List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
    ] = None,
    on_cancellation: Optional[
        List[Callable[[FlowSchema, FlowRun, State], None]]
    ] = None,
    on_crashed: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] = None,
    on_running: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] = None,
):
m
Your message was too long by 26 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
i dont need task
and signature of @flow different: ```def flow( __fn=None, *, name: Optional[str] = None, version: Optional[str] = None, flow_run_name: Optional[Union[Callable[[], str], str]] = None, retries: Optional[int] = None, retry_delay_seconds: Union[int, float, None] = None, task_runner: Optional[TaskRunner] = None, description: Optional[str] = None, timeout_seconds: Union[int, float, None] = None, validate_parameters: bool = True, persist_result: Optional[bool] = None, result_storage: Optional[ResultStorage] = None, result_serializer: Optional[ResultSerializer] = None, cache_result_in_memory: bool = True, log_prints: Optional[bool] = None, on_completion: Optional[ List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]] ] = None, on_failure: Optional[ List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]] ] = None, on_cancellation: Optional[ List[Callable[[FlowSchema, FlowRun, State], None]] ] = None, on_crashed: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] =
s
@Marvin i dont need task and @flow has
Copy code
on_cancellation: Optional[
    List[Callable[[FlowSchema, FlowRun, State], None]]
m
Ah, I see what you're aiming for! Let's streamline the example without a task and make sure to properly utilize the
on_cancellation
parameter. Here's how you can structure your flow to handle cancellation in Prefect 3.x:
Copy code
python
from prefect import flow
from prefect.states import State
from prefect.server.schemas.core import FlowRun, Flow as FlowSchema

# Define a custom function to run on cancellation
def on_cancel_fn(flow_schema: FlowSchema, flow_run: FlowRun, flow_state: State):
    print("Oh no, the flow has been cancelled! Custom cleanup in progress...")

# Define the flow with cancellation handling
@flow(on_cancellation=[on_cancel_fn])
def my_flow():
    print("Flow started")

# Run the flow
if __name__ == "__main__":
    my_flow()
In this refined snippet, the
on_cancel_fn
function is a custom handler that is triggered upon flow cancellation. The function signature matches the required parameters, which are
FlowSchema
,
FlowRun
, and
State
, validating your setup when you're specifying the
on_cancellation
argument for the
@flow
decorator. This setup captures your custom cancellation behavior elegantly. The flow eagerly awaits its moment of existential crisis, knowing exactly what to do when that moment arrives!
n
contrary to marvin's example here, you should use
client.schemas
and never
server.schemas
for stuff like
Flow
1
but yeah an
on_cancellation
hook is probably what you're looking for
1
s
@Nate
Copy code
from prefect import State, Flow as FlowSchema
from prefect.client.schemas import FlowRun

# Define a custom function to run on cancellation
def on_cancel_fn(flow_schema: FlowSchema, flow_run: FlowRun, flow_state: State):
    print("Oh no, the flow has been cancelled! Custom cleanup in progress...")
    logger.debug("Oh no, the flow has been cancelled! Custom cleanup in progress...")

@flow(
    name="Web",
    flow_run_name=flow_run_name,
    on_cancellation=[on_cancel_fn]  # Correct parameter name
)
def example_flow():
did i make correct import statements? because cancellation hook is not working in my case