Naga Sravika Bodapati
04/08/2022, 6:57 AMAnna Geller
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import pendulum
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
import time
schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
def fail_if_last_flow_run_failed(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query ($flow_id: uuid, $state_timestamp: timestamptz) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}}, {state_timestamp: {_lt: $state_timestamp}}, {state: {_neq: "Scheduled"}}]}
order_by: {state_timestamp: desc}
limit: 1
) {
name
state
end_time
state_timestamp
}
}
"""
response = client.graphql(
query=query,
variables=dict(
flow_id=prefect.context.flow_id,
state_timestamp=pendulum.now(tz="UTC").isoformat(),
),
)
last_flow_run = response["data"]["flow_run"][0]
if last_flow_run["state"] == "Failed":
logger = prefect.context.get("logger")
message = "Failing this flow run since the last flow run ended in Failure"
<http://logger.info|logger.info>(message)
return Failed(message)
return new_state
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow("hello", schedule=schedule, state_handlers=[fail_if_last_flow_run_failed]) as flow:
hw = hello_world()
Anna Geller
Naga Sravika Bodapati
04/08/2022, 9:29 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by