is there a way to find which tasks are holding the...
# ask-community
d
is there a way to find which tasks are holding the task tag concurrency locks? we’ve a case where our lock is taken up by a task when no flows are running
a
are you asking for 1.0 or 2.0?
d
1.0
a
in the UI here https://cloud.prefect.io/team/task-concurrency you could see how many task runs are using this tag and what's the limit, but to see exactly which task and flow runs are using it, you would need to either inspect manually via UI or write a GraphQL query can you explain your use case? are you trying to kill some runs that occupy the limits and you don't know which ones to kill?
d
last night we were doing some manual flow start/stop work in the ui. during that time a task that was tagged with a limit of 1 was grabbed the lock but never unlocked it (edit: but it’s flow stopped and wasn’t running), so overnight our flows kept waiting in the Queued state indefinitely. i noticed this morning and bumped our tag limit to 2 which allowed the flows to start processing again but that one task hasn’t actually release the lock yet
a
so the flow run finished, but the task run didn't?
why are you setting this limit - could you explain your use case more on a business level? not sure how I can help - what's your end goal by setting this limit? Do you have some work that requires a flow run to basically be never-ending, in a way, when one flow run finishes, it should trigger immediately the next run? I saw some users setting a limit of 1 for that purpose, curious if this is also your use case
d
either finished or was cancelled. it’s hard to know what flow it happened on: • the run just before the first instance of
Queued
was canceled • the run before that died
it should trigger immediately the next run
this is our end-goal. right now it’s (a dbt sync) scheduled hourly but often (always) runs over that hour. the task limit is to avoid two of the flows running at the same time
we’ve work getting to the “trigger the next run automatically” paradigm in flight
a
for dbt, you could actually do it via a flow of flows - check this blog post to see an example re never-ending flow, you can use a state handler (adjust to include other states than successful when needed):
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(4)  # to have enough time to kill it
    return "hello world"


def never_ending_state_handler(obj, old_state, new_state):
    if new_state.is_successful():
        create_flow_run.run(flow_name="never-ending-flow", project_name="community")
    return new_state


with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
    hello_task = hello_world()
d
we are using a flow of flow currently +1 🙂
👍 1
i hadn’t thought of state_handlers for the cyclic flow, i was just putting the
create_flow_run
after waiting for the others to be done. i like the
state_handlers
better
👍 1
is the stuck task information something you’d like to have from a bug/info gathering perspective? otherwise i’ll likely just recreate the task limit in hopes to remove that one zombie task
a
you can always go to the flow run page and from there navigate to the task run page and set the state to finished?
LMK if I can help in some other way but I guess a combination of a flow of flows + this state handler may be what you need (rather than concurrency limits) concurrency limits are usually for things like: you can't run more than 10 flows/tasks talking to a DB to avoid creating too many simultaneous DB connections hammering the DB
k
Could you try a query like this in the interactive API (assuming you still have a problem)?
Copy code
query {
  task_run (where: {state: {_eq: "Running"},
                    task: {tags: {_contains: "tag"}}}) {
    id
    flow_run{
      name
      id
    }
    task{
      name
      tags
    }
  }
}
d
ah nice, that found the task
Copy code
{
  "data": {
    "task_run": [
      {
        "id": "2caa3558-931c-4243-8713-f3189831f4cc",
        "flow_run": {
          "name": "sensible-shellfish-main",
          "id": "a1c3ccd1-89e1-4657-8a0c-5d47b211987b"
        },
        "task": {
          "name": "dbt_run",
          "tags": [
            "dbt-run",
            "dbt-run-main"
          ]
        }
      },
      {
        "id": "3ebcb221-f333-438c-a962-2d86f0f51103",
        "flow_run": {
          "name": "brass-hog-main",
          "id": "34b08fe9-14e2-4f33-835e-b3df2a96f466"
        },
        "task": {
          "name": "dbt_run",
          "tags": [
            "dbt-run",
            "dbt-run-main"
          ]
        }
      }
    ]
  }
}
the last one there is the culprit
it’s flow was cancelled but it is still running
all of its downstream tasks are still pending too
k
I think you can just mark the flow as failed to stop it
d
it’s likely some code that runs in a
finally
after the dbt run not shutting down / waiting incorrectly
k
Ohh I see.
d
that query / interactive api was super helpful, ty
👍 1