Naga Sravika Bodapati
05/25/2022, 12:08 PMAnna Geller
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time
schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
def fail_if_last_flow_run_failed(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query ($flow_id: uuid, $state_timestamp: timestamptz) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
order_by: {state_timestamp: desc}
limit: 1
) {
name
state
end_time
state_timestamp
}
}
"""
response = client.graphql(
query=query,
variables=dict(
flow_id=prefect.context.flow_id,
state_timestamp=pendulum.now(tz="UTC").isoformat(),
),
)
last_flow_run = response["data"]["flow_run"][0]
if last_flow_run["state"] == "Failed":
logger = prefect.context.get("logger")
message = "Failing this flow run since the last flow run ended in Failure"
<http://logger.info|logger.info>(message)
return Failed(message)
return new_state
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
hw = hello_world()
Naga Sravika Bodapati
05/25/2022, 12:19 PMAnna Geller
from prefect.engine.state import Failed, Skipped, Cancelled
Naga Sravika Bodapati
05/25/2022, 12:22 PMAnna Geller
return Failed(message)
Naga Sravika Bodapati
05/25/2022, 12:22 PMNaga Sravika Bodapati
05/25/2022, 12:31 PMNaga Sravika Bodapati
05/25/2022, 12:31 PMAnna Geller
Anna Geller
Naga Sravika Bodapati
05/25/2022, 1:10 PMAnna Geller
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(360)
def skip_if_running_handler(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}},
{state: {_eq: "Running"}}]}
limit: 1
offset: 1
) {
name
state
start_time
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs:
logger = prefect.context.get("logger")
message = "Skipping this flow run since there are already some flow runs in progress"
<http://logger.info|logger.info>(message)
return Skipped(message)
return new_state
with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
hello_task = hello_world()
Naga Sravika Bodapati
05/25/2022, 1:11 PM