Majo Dencilin
09/18/2024, 6:17 AMCrash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 661, in initialize_run
yield self
File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 713, in start
yield
File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 1307, in run_task_sync
with engine.run_context(), engine.transaction_context() as txn:
File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__
return next(self.gen)
^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 753, in run_context
raise CancelledError("Task run cancelled by the task runner")
asyncio.exceptions.CancelledError: Task run cancelled by the task runner
Nate
09/18/2024, 3:11 PMMajo Dencilin
09/18/2024, 5:04 PMNate
09/18/2024, 5:05 PMMajo Dencilin
09/18/2024, 7:14 PMfrom prefect import flow, task
from platform import node, platform
from prefect import deployments
from lakeview import lakeview_run_job_task
from aqlibrary import get_prev_date, create_collate_param
import aqhandler
from prefect_aws.s3 import S3Bucket
from prefect_github.repository import GitHubRepository
from prefect.states import Completed
from prefect.client.schemas.schedules import CronSchedule
from prefect.task_runners import ConcurrentTaskRunner, ThreadPoolTaskRunner
import os
CID = "c008"
FLOW_NAME = "integ-aut-amdp-test"
SANDBOX = "operations"
CLUSTER = "aqfer-preprod-eks"
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
S3_BUCKET ="s3-bucket/s3preprod"
@flow(log_prints=True, validate_parameters=False, task_runner=ThreadPoolTaskRunner(), result_storage=S3_BUCKET)
def integ_aut_amdp_test(prev_date: str = None):
state = aqhandler.flow_skip_handler(FLOW_NAME, return_state=True)
if state.is_completed():
p_date = get_prev_date(prev_date = "20230726")
t1 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-evt-hourly", result_storage_key=LOCATION).submit(cid=CID, job="aqfer-imp-aut-evt-hourly", sandbox=SANDBOX, poll_interval=30)
t2 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-pv-hourly", result_storage_key=LOCATION).submit(upstream_result=t1,cid=CID, job="aqfer-imp-aut-pv-hourly", sandbox=SANDBOX, poll_interval=10)
t3 = create_collate_param(pv=t2)
t4 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-hourly", result_storage_key=LOCATION).submit(params=t3,cid=CID, job="aqaws-collate-aut-hourly", sandbox=SANDBOX, poll_interval=30)
t5 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-daily", result_storage_key=LOCATION).submit(params=p_date,cid=CID, job="aqaws-collate-aut-daily", sandbox=SANDBOX, poll_interval=30,wait_for=[t4])
else:
return state
# flow_deploy = flow.from_source(
# source=GitHub.load("demo-repo"),
# entrypoint="datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"
if __name__ == "__main__":
flow.from_source(
source = GitHubRepository.load("xxxxxxxx"),
entrypoint = "datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"
).deploy(
name = FLOW_NAME,
tags = [CID],
version = os.getenv("FLOW_VERSION"),
work_queue_name = CID,
work_pool_name = "new-prefect-worker",
)
Nate
09/18/2024, 7:16 PMprefect version
output
• the rest of the tracebackMajo Dencilin
09/18/2024, 7:37 PMMajo Dencilin
09/19/2024, 6:20 AMMajo Dencilin
09/25/2024, 3:13 PMNate
09/25/2024, 4:25 PM