Hey folks, I'm continuing to have async tasks seem...
# prefect-cloud
k
Hey folks, I'm continuing to have async tasks seemingly cancelled at random. A flow kicks off ~200 tasks with concurrency of 5. Sometimes it works ... sometimes tasks fail with
Copy code
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError
Running cloud in K8s Curious if anyone else has seen something similar with high # of tasks
e
Is there any more to the stack trace that you can share @KG?
k
I think its just this - but I'll double check
Here's additional logs - not sure they'll be helpful but let me know if you have any thoughts. Thanks Emil 🙏
Copy code
ERROR 2024-01-18T15:02:09.965619672Z [resource.labels.containerName: prefect-job] 15:02:09.962 | ERROR | Task run 'xxx-7' - Encountered exception during execution:
  {
    "textPayload": "15:02:09.962 | ERROR   | Task run 'xxx-7' - Encountered exception during execution:",
    "insertId": "h38totpdr7adbvqr",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "project_id": "xxx",
        "location": "us-west1-b",
        "cluster_name": "xxx",
        "namespace_name": "prefect",
        "container_name": "prefect-job",
        "pod_name": "kubernetes-job-default-582j7-t7x26"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965619672Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx",
      "k8s-pod/job-name": "kubernetes-job-default-582j7"
    },
    "logName": "projects/xxx-api/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
ERROR 2024-01-18T15:02:09.965665612Z [resource.labels.containerName: prefect-job] Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult return await asyncio.wrap_future(self.future)
  {
    "textPayload": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py\", line 291, in aresult\n    return await asyncio.wrap_future(self.future)",
    "insertId": "bv8vljruhjjeol7g",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "namespace_name": "prefect",
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "container_name": "prefect-job",
        "project_id": "xxx-api",
        "cluster_name": "xxx",
        "location": "us-west1-b"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965665612Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx",
      "k8s-pod/job-name": "kubernetes-job-default-582j7"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z",
    "errorGroups": [
      {
        "id": "CJK3ru_A69ysTg"
      }
    ]
  }
ERROR 2024-01-18T15:02:09.965679707Z [resource.labels.containerName: prefect-job] asyncio.exceptions.CancelledError
  {
    "textPayload": "asyncio.exceptions.CancelledError",
    "insertId": "6ukann3czzllkzme",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "container_name": "prefect-job",
        "cluster_name": "xxx",
        "namespace_name": "prefect",
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "project_id": "xxx",
        "location": "us-west1-b"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965679707Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "k8s-pod/job-name": "kubernetes-job-default-582j7",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
ERROR 2024-01-18T15:02:09.965683700Z [resource.labels.containerName: prefect-job] {}
  {
    "insertId": "ale7age9ftq3y3kz",
    "jsonPayload": {},
    "resource": {
      "type": "k8s_container",
      "labels": {
        "container_name": "prefect-job",
        "project_id": "xxx",
        "location": "us-west1-b",
        "cluster_name": "xxx",
        "namespace_name": "prefect",
        "pod_name": "kubernetes-job-default-582j7-t7x26"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965683700Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "k8s-pod/job-name": "kubernetes-job-default-582j7",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
ERROR 2024-01-18T15:02:09.965688260Z [resource.labels.containerName: prefect-job] The above exception was the direct cause of the following exception:
  {
    "textPayload": "The above exception was the direct cause of the following exception:",
    "insertId": "9ulrehlcdmvkbjg6",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "project_id": "xxx",
        "location": "us-west1-b",
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "container_name": "prefect-job",
        "namespace_name": "prefect",
        "cluster_name": "xxx"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965688260Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/job-name": "kubernetes-job-default-582j7",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx",
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
ERROR 2024-01-18T15:02:09.965691656Z [resource.labels.containerName: prefect-job] {}
  {
    "insertId": "8ulojbkkp0o40wms",
    "jsonPayload": {},
    "resource": {
      "type": "k8s_container",
      "labels": {
        "namespace_name": "prefect",
        "project_id": "xxx",
        "cluster_name": "xxx",
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "location": "us-west1-b",
        "container_name": "prefect-job"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965691656Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "k8s-pod/job-name": "kubernetes-job-default-582j7",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
ERROR 2024-01-18T15:02:09.965695392Z [resource.labels.containerName: prefect-job] Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1760, in orchestrate_task_run result = await call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult raise CancelledError() from exc
  {
    "textPayload": "Traceback (most recent call last):\n  File \"/usr/local/lib/python3.10/site-packages/prefect/engine.py\", line 1760, in orchestrate_task_run\n    result = await call.aresult()\n  File \"/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py\", line 293, in aresult\n    raise CancelledError() from exc",
    "insertId": "0agq8xwj4k9lkp4o",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "container_name": "prefect-job",
        "project_id": "xxx",
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "location": "us-west1-b",
        "cluster_name": "xxx",
        "namespace_name": "prefect"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965695392Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/app_kubernetes_io/managed-by": "prefect",
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx",
      "k8s-pod/job-name": "kubernetes-job-default-582j7"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z",
    "errorGroups": [
      {
        "id": "COWSo6K-gdKYEA"
      }
    ]
  }
ERROR 2024-01-18T15:02:09.965713750Z [resource.labels.containerName: prefect-job] prefect._internal.concurrency.cancellation.CancelledError
  {
    "textPayload": "prefect._internal.concurrency.cancellation.CancelledError",
    "insertId": "nwfddyw6qr8v5pms",
    "resource": {
      "type": "k8s_container",
      "labels": {
        "pod_name": "kubernetes-job-default-582j7-t7x26",
        "location": "us-west1-b",
        "namespace_name": "prefect",
        "cluster_name": "xxx",
        "container_name": "prefect-job",
        "project_id": "xxx"
      }
    },
    "timestamp": "2024-01-18T15:02:09.965713750Z",
    "severity": "ERROR",
    "labels": {
      "k8s-pod/controller-uid": "eca345ef-e48a-434d-8609-48b9b8257ee2",
      "<http://compute.googleapis.com/resource_name|compute.googleapis.com/resource_name>": "gke-xxx-default-pool-3edacc4a-2xwx",
      "k8s-pod/job-name": "kubernetes-job-default-582j7",
      "k8s-pod/app_kubernetes_io/managed-by": "prefect"
    },
    "logName": "projects/xxx/logs/stderr",
    "receiveTimestamp": "2024-01-18T15:02:10.760526835Z"
  }
b
Hi KG 👋 would you be willing to enable
DEBUG
level logging for this flow? That's usually a good next step for troubleshooting. If you do end up doing that, we'd be happy to take a look at them.
e
Also, are you using any async code? How are you executing the tasks?
upvote 1
k
Sure I can enable DEBUG We're deploying a docker image that runs this deployment script
Copy code
from blocks.secrets import ALL_SECRETS
from blocks.slack import save_slack_webhooks
from blocks.filesystems import save_gcs_block


async def main():
    for secret in ALL_SECRETS:
        promise = secret.save()
        if promise is not None:
            await promise

    await save_gcp_credentials()
    await save_gcs_block()
    await KUBERNETES_JOB.save(KUBERNETES_JOB.name or "kubernetes-job-default", overwrite=True)

    await save_slack_webhooks()

    # Apply deployments last (List of flows where each is a Deployment.build_from_flow)
    for deployment in ALL_DEPLOYMENTS:
        await deployment.apply(upload=False)




if __name__ == "__main__":
    asyncio.run(main())
e
Hi @Bianca Hoch @Emil Christensen @KG Did you ever find a solution to this? I’m also running airbyte syncs using Prefect (async flows) and if airbyte restarts, my Prefect flow goes into a cancelled state.