Grace
03/12/2024, 6:09 PMset_task_run_state
using the client; however, I've had no luck accessing the task UUID from the scope of the flow (when I use get_run_context, or read_flow_run, information about the tasks are not populated, maybe because they are still active?). Is this possible?Marvin
03/12/2024, 7:37 PMpython
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}") # you can template arguments into the task name!
def add(x: int, y: int) -> int:
"""just a python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""tasks must be called in the flow to be executed (at this time)"""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# run the flow
my_flow()
# you can run tasks' functions directly outside of a flow if you want
# but it won't be tracked by prefect, it's just a normal function
assert add.fn(1, 2) == 3
Basic Task Example in Prefect
This example does not directly answer the question but shows how tasks are structured and invoked in a flow. To manually fail a task, you might need to throw an exception within the task code or use a control mechanism provided by Prefect to change the state.
It might also be helpful to explore Prefect's Timeouts feature, where flows can be configured to time out after a certain period. A suspended flow run will time out after one hour by default, causing the flow run to fail if it never resumes. This might provide an indirect way to end active tasks if a timeout can be triggered intentionally.
For a more tailored solution, the use of the prefect.Client
to manipulate task states could be necessary. You can review the Prefect Core API for functions like set_task_run_state()
and see if there are examples for using these within a flow context.
If you need further assistance with a specific implementation or workaround, I recommend checking GitHub Issues or reaching out on Prefect community channels, as it might require some in-depth context sharing.