Dennis Hinnenkamp
03/25/2022, 7:02 AMStéphan Taljaard
03/25/2022, 7:07 AMAnna Geller
Dennis Hinnenkamp
03/25/2022, 2:23 PMAnna Geller
.py
🙂
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time
import pendulum
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(4) # to have enough time to kill it
return "hello world"
def never_ending_state_handler(obj, old_state, new_state):
if new_state.is_successful():
logger = prefect.context.get("logger")
now = pendulum.now().time() # change to utcnow if you prefer UTC time
if pendulum.time(5) <= now <= pendulum.time(20):
<http://logger.info|logger.info>("It's between 5 AM and 8 PM - creating a new run...")
create_flow_run.run(flow_name="never-ending-flow", project_name="community")
else:
<http://logger.info|logger.info>("It's NOT between 5 AM and 8 PM - ending flow runs for today")
return new_state
with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
hello_task = hello_world()
if pendulum.time(5) <= now <= pendulum.time(20):
Dennis Hinnenkamp
03/25/2022, 3:37 PM