<@ULVA73B9P> I have a flow with several tasks. Is ...
# marvin-in-the-wild
g
@Marvin I have a flow with several tasks. Is it possible to end (change the state to Failed) an active task from its parent flow? My use case is that I'm trying to cause a flow to fail if some criteria is met. However, changing the state of the flow to Failed doesn't actually end the flow until the active tasks finish running. Is there a way to interrupt and end these active tasks? I see that there's
set_task_run_state
using the client; however, I've had no luck accessing the task UUID from the scope of the flow (when I use get_run_context, or read_flow_run, information about the tasks are not populated, maybe because they are still active?). Is this possible?
m
It seems that the documentation did not provide clear instructions for interrupting and ending active tasks from the flow itself. However, there's a code example on manually failing a task that might guide you towards a solution. Here is the code example for reference:
Copy code
python
from prefect import flow, task

@task(task_run_name="Adding {x} and {y}")  # you can template arguments into the task name!
def add(x: int, y: int) -> int:
    """just a python function with a fancy hat on!"""
    return x + y

@flow
def my_flow():
    """tasks must be called in the flow to be executed (at this time)"""
    first_result = add(1, 2) 
    second_result = add(first_result, first_result)
    return add(first_result, second_result)

if __name__ == "__main__":
    # run the flow
    my_flow()
    
    # you can run tasks' functions directly outside of a flow if you want
    # but it won't be tracked by prefect, it's just a normal function
    assert add.fn(1, 2) == 3
Basic Task Example in Prefect This example does not directly answer the question but shows how tasks are structured and invoked in a flow. To manually fail a task, you might need to throw an exception within the task code or use a control mechanism provided by Prefect to change the state. It might also be helpful to explore Prefect's Timeouts feature, where flows can be configured to time out after a certain period. A suspended flow run will time out after one hour by default, causing the flow run to fail if it never resumes. This might provide an indirect way to end active tasks if a timeout can be triggered intentionally. For a more tailored solution, the use of the
prefect.Client
to manipulate task states could be necessary. You can review the Prefect Core API for functions like
set_task_run_state()
and see if there are examples for using these within a flow context. If you need further assistance with a specific implementation or workaround, I recommend checking GitHub Issues or reaching out on Prefect community channels, as it might require some in-depth context sharing.