Tara
01/31/2022, 6:11 AMAnna Geller
Kevin Kho
Tara
01/31/2022, 10:57 PMKevin Kho
create_flow_run
task and schedule a new scheduled_start_time
Tara
02/01/2022, 12:01 PMcreate_flow_run
within another flow to create the main flow run, or do I use create_flow_run
within the flow (at the end) in order to call itself to run again at the next scheduled time (not sure if this is possible)?Anna Geller
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
time.sleep(60) # as much as you want to have pause in between
create_flow_run.run(flow_name="never-ending-flow", project_name="community")
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
What Kevin suggested was to do the same as above but instead of doing time.sleep() in between flow runs, simply add a timedelta to the scheduled_start_time as shown here:
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
from datetime import timedelta
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
create_flow_run.run(
flow_name="never-ending-flow",
project_name="community",
scheduled_start_time=timedelta(minutes=2),
)
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
Tara
02/02/2022, 2:49 AM