Sergei
12/13/2024, 12:41 AMMarvin
12/13/2024, 12:41 AMSergei
12/13/2024, 12:42 AMMarvin
12/13/2024, 12:42 AMpython
from prefect import flow, task
from prefect.states import State
# Define a custom function to run on cancellation
def on_cancel_fn(flow_state: State):
print("Oh no, the flow has been cancelled! Custom cleanup in progress...")
# Define a simple task
@task
def simple_task():
print("Running task")
# Define the flow
@flow(on_cancelling=[on_cancel_fn])
def my_flow():
simple_task()
# Run the flow
if __name__ == "__main__":
my_flow()
In this snippet, the on_cancel_fn
function prints a message when the flow is cancelled. The @flow
decorator accepts an on_cancelling
argument where you can pass in a list of functions to handle cancellation events, such as on_cancel_fn
. When the flow is cancelled, this function is triggered.
You can learn more about state handlers and managing flow states from the official Prefect 3.x documentation here.
It's as if your flow feels the existential dread of cancellation, but at least it cleans up nicely!Sergei
12/13/2024, 1:07 AMdef flow(
__fn=None,
*,
name: Optional[str] = None,
version: Optional[str] = None,
flow_run_name: Optional[Union[Callable[[], str], str]] = None,
retries: Optional[int] = None,
retry_delay_seconds: Union[int, float, None] = None,
task_runner: Optional[TaskRunner] = None,
description: Optional[str] = None,
timeout_seconds: Union[int, float, None] = None,
validate_parameters: bool = True,
persist_result: Optional[bool] = None,
result_storage: Optional[ResultStorage] = None,
result_serializer: Optional[ResultSerializer] = None,
cache_result_in_memory: bool = True,
log_prints: Optional[bool] = None,
on_completion: Optional[
List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_failure: Optional[
List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]]
] = None,
on_cancellation: Optional[
List[Callable[[FlowSchema, FlowRun, State], None]]
] = None,
on_crashed: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] = None,
on_running: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] = None,
):
Marvin
12/13/2024, 1:07 AMi dont need taskand signature of @flow different: ```def flow( __fn=None, *, name: Optional[str] = None, version: Optional[str] = None, flow_run_name: Optional[Union[Callable[[], str], str]] = None, retries: Optional[int] = None, retry_delay_seconds: Union[int, float, None] = None, task_runner: Optional[TaskRunner] = None, description: Optional[str] = None, timeout_seconds: Union[int, float, None] = None, validate_parameters: bool = True, persist_result: Optional[bool] = None, result_storage: Optional[ResultStorage] = None, result_serializer: Optional[ResultSerializer] = None, cache_result_in_memory: bool = True, log_prints: Optional[bool] = None, on_completion: Optional[ List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]] ] = None, on_failure: Optional[ List[Callable[[FlowSchema, FlowRun, State], Union[Awaitable[None], None]]] ] = None, on_cancellation: Optional[ List[Callable[[FlowSchema, FlowRun, State], None]] ] = None, on_crashed: Optional[List[Callable[[FlowSchema, FlowRun, State], None]]] =
Sergei
12/13/2024, 1:08 AMon_cancellation: Optional[
List[Callable[[FlowSchema, FlowRun, State], None]]
Marvin
12/13/2024, 1:08 AMon_cancellation
parameter. Here's how you can structure your flow to handle cancellation in Prefect 3.x:
python
from prefect import flow
from prefect.states import State
from prefect.server.schemas.core import FlowRun, Flow as FlowSchema
# Define a custom function to run on cancellation
def on_cancel_fn(flow_schema: FlowSchema, flow_run: FlowRun, flow_state: State):
print("Oh no, the flow has been cancelled! Custom cleanup in progress...")
# Define the flow with cancellation handling
@flow(on_cancellation=[on_cancel_fn])
def my_flow():
print("Flow started")
# Run the flow
if __name__ == "__main__":
my_flow()
In this refined snippet, the on_cancel_fn
function is a custom handler that is triggered upon flow cancellation. The function signature matches the required parameters, which are FlowSchema
, FlowRun
, and State
, validating your setup when you're specifying the on_cancellation
argument for the @flow
decorator. This setup captures your custom cancellation behavior elegantly.
The flow eagerly awaits its moment of existential crisis, knowing exactly what to do when that moment arrives!Nate
12/13/2024, 3:23 AMclient.schemas
and never server.schemas
for stuff like Flow
Nate
12/13/2024, 3:23 AMon_cancellation
hook is probably what you're looking forSergei
12/13/2024, 2:27 PMfrom prefect import State, Flow as FlowSchema
from prefect.client.schemas import FlowRun
# Define a custom function to run on cancellation
def on_cancel_fn(flow_schema: FlowSchema, flow_run: FlowRun, flow_state: State):
print("Oh no, the flow has been cancelled! Custom cleanup in progress...")
logger.debug("Oh no, the flow has been cancelled! Custom cleanup in progress...")
@flow(
name="Web",
flow_run_name=flow_run_name,
on_cancellation=[on_cancel_fn] # Correct parameter name
)
def example_flow():
did i make correct import statements? because cancellation hook is not working in my case