Tara
01/31/2022, 6:11 AMAnna Geller
Kevin Kho
Tara
01/31/2022, 10:57 PMKevin Kho
create_flow_run
task and schedule a new scheduled_start_time
Tara
02/01/2022, 12:01 PMcreate_flow_run
within another flow to create the main flow run, or do I use create_flow_run
within the flow (at the end) in order to call itself to run again at the next scheduled time (not sure if this is possible)?Tara
02/01/2022, 12:04 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
time.sleep(60) # as much as you want to have pause in between
create_flow_run.run(flow_name="never-ending-flow", project_name="community")
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
What Kevin suggested was to do the same as above but instead of doing time.sleep() in between flow runs, simply add a timedelta to the scheduled_start_time as shown here:
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
from datetime import timedelta
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
create_flow_run.run(
flow_name="never-ending-flow",
project_name="community",
scheduled_start_time=timedelta(minutes=2),
)
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
Tara
02/02/2022, 2:49 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by