https://prefect.io logo
Title
j

John Mizerany

10/11/2022, 2:30 PM
I have had a few flows (using Prefect Cloud 1) that have gotten stuck in the middle of a run. When cancelling the flow they are also getting stuck in a “cancelling” state and have a few still cancelling for 20 minutes. We are running our agent on EC2, but this looks like memory issues for the agent?
1
b

Bianca Hoch

10/11/2022, 4:07 PM
Hello John, you could try cancelling the flow run with the following code:
from prefect.client import Client
import prefect
import time

def cancel_runs_for_flow(version_group_id: str) -> None:
    c = Client()
    query = c.graphql(
        "\n".join(
            [
                "query {",
                "      flow_run(where: {",
                "          _and: {",
                f'            flow: {{version_group_id: {{_eq: "{version_group_id}"}}}}',
                "            state: {",
                "              _in: [",
                '                     "Cancelling"]',
                "            }",
                "          }",
                "          }",
                "        ) {",
                "      id",
                "    }}",
            ]
        )
    )

    result = query.get("data").get("flow_run")
    id_list = list()
    print("Fetched flows")
    for i in result:
        flow_id = i["id"]
        state = prefect.engine.state.Cancelled()
        version = None
        print(i)
        time.sleep(1)
        c.set_flow_run_state(flow_run_id=flow_id, state=state)
Are there any underlying tasks in the flow runs that are stuck in a Running state?
j

John Mizerany

10/11/2022, 4:11 PM
Hey Bianca, the ones that are stuck were just stuck in the middle of a specific task (none of them are related) and I am not able to cancel them
Essentially the flows are still running because they can’t be cancelled. I think we are going to try to restart our agent and see if that fixes the issue since we have other flows that are running successfully with no issues
b

Bianca Hoch

10/11/2022, 4:17 PM
Cancellation is a best effort on our part, and can sometimes be tricky when it comes to interrupting running code. I'll drop a link to this previous thread here, as Michael described the nuances of cancellation very well there.
A more robust way of cancelling a flow run would be cancelling the job in the infrastructure. (ex: by killing the process on that machine or the K8s job).
As a preventative measure to prevent tasks from getting stuck in a Running state, you could utilize a
timeout
argument in the task decorator. This will fail a task if it takes longer than the specified time.
@task(timeout=300, state_handlers=[some_action_on_failed])
def some_task():
   pass
👀 2
j

John Mizerany

10/11/2022, 5:39 PM
Thanks! I can try a few of these options and see what works best