Hi folks, question related to flow scheduling: is ...
# prefect-community
a
Hi folks, question related to flow scheduling: is it possible to tell Prefect to start a schedule only if there are no other running instances of that flow? Basically I want to prevent concurrent runnings of the same flow
r
Hi! I’m still a Prefect Noob, but I stumbled on this yesterday. I think this could be your solution! https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html
a
Thanks @Raphaël Riel! I forgot to mention that we’re on Prefect Server, but Task Concurrency Limiting is available only on Prefect Cloud.
r
Oh! Good to know.
When I have to work in other systems that requires such a think, I usually rely on a Lock in another system (Redis, In the DB as a LOCK table, etc.) When the task start, it will attempt to acquire the lock. If unable to, it should raise a SKIP Signal (Ori equivalent) See:
prefect.engine.signals.SKIP
upvote 1
WDYT?
n
Hi @ale - @Raphaël Riel's almost correct with task concurrency limiting, but that would only limit the number of tasks with a given label that could enter a running state at any given time. A recent feature added to Prefect Cloud is flow concurrency limiting by label, which would do what you're looking for. However, on Prefect Server you can accomplish this instead by adding a task to the beginning of your flow which checks for any other runs and raises a cancel signal if there are other runs, proceeds otherwise. You can read more about signals here: https://docs.prefect.io/api/latest/engine/signals.html#signals
👏 1
r
That timing @nicholas 😛
😆 1
a
That’s exactly what I was looking for! Thanks a lot @nicholas 👍 Much appreciated!
👍 1
n
great minds @Raphaël Riel 😆 !
a
To get active runs from Prefect Server I would you Prefect Client API, but I can’t find any method to retrieve active flow runs given the flow name. Am I missing something? 🤔
n
You'll need to use a custom graphql call, something like this:
Copy code
from prefect import Client

@task
def check_runs():
  c = Client()

  query = """
    query RunningFlows {
       flow_run(where: { state: { _eq: "Running" } }) {
         id
       }
    }
  """
  c.graphql(query=query)
Depending on your use case, you may want to check for
Submitted
and
Running
, and probably filter out the current flow run using the
flow_run_id
embedded in the
context
.
a
Thanks again @nicholas! I think I owe you some beers 😉
😄 1
@nicholas I was able to get the right GraphQL query to check if there are other running/submitted instances of a flow. But how can I trigger an execution cancellation like I would from the UI? I don’t see any specific state to handle this. I could use FAIL, but it would be somehow misleading…
n
HI @ale - there's a mutation specifically for this:
Copy code
mutation CancelFlowRun($flowRunId: UUID!) {
  cancel_flow_run(input: { flow_run_id: $flowRunId }) {
    state
  }
}
which you can supply a
flow_run_id
(you don't need to use the variables as I have here)
👍 1
s
@nicholas Can you show me the above pieces as python program to cancel the flow if it is in running state
a
Hey @SK The GraphQL client has a convenient
cancel_flow_run
method as far as I know!
upvote 1
s
@ale Trying to do in python as below. How to pass the values to "$flowRunId: UUID!" import prefect from prefect import Client from prefect import task, Flow @task() def check_runs(): c = Client() query = """ query RunningFlowsName { flow(where: {name: {_eq: "flowstatechecktest"}}) { id } } """ print('======') print(c.graphql(query=query)) query2 = """ query RunningFlowsState { flow_run(where: {state: {_eq: "Running"}}) { state } } """ print('======') print(c.graphql(query=query2)) query3 = """ mutation CancelFlowRun($flowRunId: UUID!) { cancel_flow_run(input: {flow_run_id: $flowRunId}) { state } } """ c.graphql(query=query3) with Flow("flowstatechecktest") as flow: check_runs() flow.run()
n
Hi @S K - since this thread is quite old, would you mind posting your question in the main channel?
👍 1