Samuel Hinton
10/26/2021, 10:43 PMKevin Kho
Samuel Hinton
10/26/2021, 10:49 PMSamuel Hinton
10/26/2021, 10:50 PMKevin Kho
Samuel Hinton
10/26/2021, 10:54 PMSamuel Hinton
10/26/2021, 10:56 PMSamuel Hinton
10/26/2021, 10:56 PMKevin Kho
Kevin Kho
Samuel Hinton
10/26/2021, 11:02 PMSamuel Hinton
10/26/2021, 11:11 PMKevin Kho
Client().graphql()
to hit the API. The state handler would be applied on the scheduled -> running transition.
In this state handler, use the Client to query for the number of flows running.
query {
flow (where:
{name: {_eq: "..."},
project: {name: {_eq: "bristech"}},
archived: {_eq: false}}) {
name
project {
name
}
archived
flow_runs(where: {state: {_eq: "Running"}}){
id
state
}
}
}
And then use Python to count the number.
If the number of flows is at the concurrency you want for that flow, then just return SUCCESSKevin Kho
Kevin Kho
Samuel Hinton
10/26/2021, 11:14 PMKevin Kho
Samuel Hinton
10/26/2021, 11:15 PMKevin Kho
Samuel Hinton
10/26/2021, 11:37 PMKevin Kho
Anna Geller
I would want to say “Look, I dont care if youve missed the last 20 of them, dont try and run all 20 of then, just run the last one”There is also a UI feature to clear late runs with one click.
Samuel Hinton
10/27/2021, 9:54 AMKevin Mullins
10/27/2021, 5:50 PMKevin Kho
Kevin Mullins
10/27/2021, 5:53 PMSamuel Hinton
10/27/2021, 5:55 PMSamuel Hinton
10/29/2021, 2:08 PMdef state_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if old_state.is_pending() and new_state.is_running():
client = Client()
now = dt.now(tz=pytz.UTC)
result = client.graphql(
"""{
flow(where: {
archived: {_eq: false},
name: {_eq: "%s"}
}) {
name
archived
flow_runs (where: {
state: {_in: ["Scheduled", "Retrying", "Running"]},
scheduled_start_time:{_lte: "%s"}
}) {
scheduled_start_time
start_time
name
state
id
}
}"""
% (flow.name, now.isoformat())
)
# These flow runs will be everything thats scheduled to start in the past and
# might have built up.
flow_runs = result["data"]["flow"]["flow_runs"]
# I dont want to run another task if:
# 1. Theres already a flow in the running state
# 2. If there are multiple scheduled, only the latest one should be run
any_running = any([f["state"] == "Running" for f in flow_runs])
if any_running:
return Cancelled("Existing tasks are already running")
scheduled = [f for f in flow_runs if f["state"] in ("Pending", "Scheduled")]
if len(scheduled) > 1:
last_scheduled_time = max([dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled])
this_flow_run = None
# How do I get the flow run id? It doesnt seem to be in the Flow, and not in the State either
pass
Kevin Kho
prefect.context.get("flow_run_id")
inside itSamuel Hinton
10/29/2021, 2:10 PMSamuel Hinton
10/29/2021, 2:11 PMflow.context.get(…)
work too, or is prefect.context
going to do some scope matching or similar to know whats going on?Kevin Kho
Samuel Hinton
10/29/2021, 2:23 PMflow_run_context
inside Flow itself, but its never persisted as an attribute, apologiesKevin Kho
Samuel Hinton
10/29/2021, 2:55 PMKevin Kho
Kevin Kho
Samuel Hinton
10/29/2021, 2:59 PMSamuel Hinton
10/29/2021, 3:00 PMSamuel Hinton
11/02/2021, 2:32 PMimport datetime
from datetime import timedelta, datetime as dt
import json
import os
import gc
from typing import Optional
import pytz
import requests
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.engine.state import Failed
from prefect.utilities.notifications import slack_notifier
from prefect.engine.signals import SKIP
from prefect.engine.state import Cancelled, State
from prefect.client import Client
def concurrent_handler(flow: Flow, old_state: State, new_state: State) -> Optional[State]:
if old_state.is_pending() and new_state.is_running():
client = Client()
now = dt.now(tz=pytz.UTC).replace(microsecond=0) + timedelta(seconds=1)
# Replacing microseconds because graphql api cant always handle the number of decimals
result = client.graphql(
"""{
flow(where: {
archived: {_eq: false},
name: {_eq: "%s"}
}) {
name
archived
flow_runs (where: {
state: {_in: ["Submitted", "Queued", "Scheduled", "Retrying", "Running"]},
scheduled_start_time:{_lte: "%s"}
}) {
scheduled_start_time
start_time
name
state
id
}
}
}"""
% (flow.name, now.isoformat()) # Sorry for % operator, but those {} make it a pain
)
# These flow runs will be everything thats scheduled to start in the past and
# might have built up.
logger = prefect.context.get("logger")
# This might fail if the GraphQL cant find anything, but havent seen this in practise
flow_runs = result["data"]["flow"][0]["flow_runs"]
# I dont want to run another task if theres already more than one flow running
# For me, Im happy to have two running at once, as API issues means we can get timeouts and
# hangs that dont terminate easily. For other use cases, Id generally say to cancel if theres
# any running
num_running = sum([1 if f["state"] in ("Running", "Retrying") else 0 for f in flow_runs])
if num_running > 1:
msg = "Existing tasks are already running"
<http://logger.info|logger.info>(msg)
return Cancelled(msg)
# And if there are multiple scheduled, only the latest one should be run
scheduled = [
f for f in flow_runs if f["state"] in ("Pending", "Scheduled", "Queued", "Submitted")
]
if len(scheduled) > 1:
last_scheduled_time = max(
[dt.fromisoformat(f["scheduled_start_time"]) for f in scheduled]
)
this_flow_run_id = prefect.context.get("flow_run_id")
matching_runs = [f for f in scheduled if f["id"] == this_flow_run_id]
if not matching_runs:
<http://logger.info|logger.info>(f"Current id is {this_flow_run_id}")
<http://logger.info|logger.info>(f"Flow runs are: {scheduled}")
return Cancelled("Nope")
this_run = matching_runs[0]
this_run_time = dt.fromisoformat(this_run["scheduled_start_time"])
if this_run_time != last_scheduled_time:
msg = "Multiple scheduled tasks, this is not the last one"
<http://logger.info|logger.info>(msg)
return Cancelled(msg)
return new_state
Will allow a max of two concurrent running jobs (can easily change this to one), and if multiple jobs are scheduled (ie your agent was down for a while and is now back up), only the last one will execute and the others will log and cancel. Not all the imports are necessary, havent filtered out the imports from the other handlers and common functions in the file.Kevin Mullins
11/02/2021, 2:35 PMKevin Kho
Samuel Hinton
11/02/2021, 3:01 PMKevin Kho