Martin Durkac
10/13/2021, 11:54 AMAnna Geller
Martin Durkac
10/13/2021, 12:38 PMAnna Geller
Finished
state, there will be a new FlowRun created.Anna Geller
Success
to avoid creating new flow runs if something is wrong in the actual flow logic.Martin Durkac
10/13/2021, 1:06 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if old_state.is_running() and new_state.is_successful():
create_flow_run.run(flow_name="never-ending-flow", project_name="community")
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
Kevin Kho
Martin Durkac
10/14/2021, 8:28 AMAnna Geller
Martin Durkac
10/14/2021, 8:36 AM