Hi all, is any one facing the below problem when o...
# prefect-cloud
m
Hi all, is any one facing the below problem when one of the task is suddenly getting crashed by throwing the below error.
Copy code
Crash details:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 661, in initialize_run
    yield self
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 713, in start
    yield
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 1307, in run_task_sync
    with engine.run_context(), engine.transaction_context() as txn:
  File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 753, in run_context
    raise CancelledError("Task run cancelled by the task runner")
asyncio.exceptions.CancelledError: Task run cancelled by the task runner
n
hi @Majo Dencilin - would you be able to share an example of how your crashing code looks?
m
@Nate your asking for the flow code
n
yes! at least the relevant part related to your crash above
m
@Nate here is the pipeline code
Copy code
from prefect import flow, task
from platform import node, platform
from prefect import deployments
from lakeview import lakeview_run_job_task
from aqlibrary import get_prev_date, create_collate_param
import aqhandler
from prefect_aws.s3 import S3Bucket
from prefect_github.repository import GitHubRepository
from prefect.states import Completed
from prefect.client.schemas.schedules import CronSchedule
from prefect.task_runners import ConcurrentTaskRunner, ThreadPoolTaskRunner
import os


CID = "c008"
FLOW_NAME = "integ-aut-amdp-test"
SANDBOX = "operations"
CLUSTER = "aqfer-preprod-eks"

LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
S3_BUCKET ="s3-bucket/s3preprod"


@flow(log_prints=True, validate_parameters=False, task_runner=ThreadPoolTaskRunner(), result_storage=S3_BUCKET)
def integ_aut_amdp_test(prev_date: str = None):
    state = aqhandler.flow_skip_handler(FLOW_NAME, return_state=True)
    if state.is_completed():
        p_date = get_prev_date(prev_date = "20230726")
        t1 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-evt-hourly", result_storage_key=LOCATION).submit(cid=CID, job="aqfer-imp-aut-evt-hourly", sandbox=SANDBOX, poll_interval=30)
        t2 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-pv-hourly", result_storage_key=LOCATION).submit(upstream_result=t1,cid=CID, job="aqfer-imp-aut-pv-hourly", sandbox=SANDBOX, poll_interval=10)
        t3 = create_collate_param(pv=t2)
        t4 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-hourly", result_storage_key=LOCATION).submit(params=t3,cid=CID, job="aqaws-collate-aut-hourly", sandbox=SANDBOX, poll_interval=30)
        t5 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-daily", result_storage_key=LOCATION).submit(params=p_date,cid=CID, job="aqaws-collate-aut-daily", sandbox=SANDBOX, poll_interval=30,wait_for=[t4])
    else:
        return state

# flow_deploy = flow.from_source(
#         source=GitHub.load("demo-repo"),
#         entrypoint="datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"


if __name__ == "__main__":
    flow.from_source(
        source = GitHubRepository.load("xxxxxxxx"),
        entrypoint = "datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"
    ).deploy(
        name = FLOW_NAME,
        tags = [CID],
        version = os.getenv("FLOW_VERSION"),
        work_queue_name = CID,
        work_pool_name = "new-prefect-worker",
    )
n
thanks! if you could share the following, I can look more into this later •
prefect version
output • the rest of the traceback
m
I am using prefect 3.0.1 the rest of the traceback?
the one shared is the only thing i got it as a traceback.
@Nate any findings on this
n
sorry, I haven't had the time. if you have a minimal reproducible example of this then I think an issue would be appropriate!