Ievgenii Martynenko
12/07/2021, 1:24 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
create_flow_run.run(flow_name="never-ending-flow", project_name="community")
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
Ievgenii Martynenko
12/07/2021, 1:45 PMIevgenii Martynenko
12/07/2021, 1:55 PMAnna Geller
from prefect import task
from prefect.engine.signals import ENDRUN
from prefect.engine.state import Success
@task(log_stdout=True)
def check_if_condition_met():
if "some state of the world":
raise ENDRUN(Success(message="Data is already in the DB"))
Anna Geller
Just for reference: such behavior is possible in Autosys (CA Workload Automations) or SQL Server Agent.All that those solutions are doing is basically: Skip if running. If you want this, you could have a state handler that queries the API if there is currently any flow run in a Running state for this flow, and if so, ending the current flow run immediately without doing anything.
Ievgenii Martynenko
12/07/2021, 2:06 PMKevin Kho
Ievgenii Martynenko
12/07/2021, 3:00 PMKevin Kho
Anna Geller
Ievgenii Martynenko
12/07/2021, 3:55 PM