Majo Dencilin
09/18/2024, 6:17 AMCrash details:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 661, in initialize_run
    yield self
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 713, in start
    yield
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 1307, in run_task_sync
    with engine.run_context(), engine.transaction_context() as txn:
  File "/usr/local/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/task_engine.py", line 753, in run_context
    raise CancelledError("Task run cancelled by the task runner")
asyncio.exceptions.CancelledError: Task run cancelled by the task runnerNate
09/18/2024, 3:11 PMMajo Dencilin
09/18/2024, 5:04 PMNate
09/18/2024, 5:05 PMMajo Dencilin
09/18/2024, 7:14 PMfrom prefect import flow, task
from platform import node, platform
from prefect import deployments
from lakeview import lakeview_run_job_task
from aqlibrary import get_prev_date, create_collate_param
import aqhandler
from prefect_aws.s3 import S3Bucket
from prefect_github.repository import GitHubRepository
from prefect.states import Completed
from prefect.client.schemas.schedules import CronSchedule
from prefect.task_runners import ConcurrentTaskRunner, ThreadPoolTaskRunner
import os
CID = "c008"
FLOW_NAME = "integ-aut-amdp-test"
SANDBOX = "operations"
CLUSTER = "aqfer-preprod-eks"
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
S3_BUCKET ="s3-bucket/s3preprod"
@flow(log_prints=True, validate_parameters=False, task_runner=ThreadPoolTaskRunner(), result_storage=S3_BUCKET)
def integ_aut_amdp_test(prev_date: str = None):
    state = aqhandler.flow_skip_handler(FLOW_NAME, return_state=True)
    if state.is_completed():
        p_date = get_prev_date(prev_date = "20230726")
        t1 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-evt-hourly", result_storage_key=LOCATION).submit(cid=CID, job="aqfer-imp-aut-evt-hourly", sandbox=SANDBOX, poll_interval=30)
        t2 = lakeview_run_job_task.with_options(task_run_name="aqfer-imp-aut-pv-hourly", result_storage_key=LOCATION).submit(upstream_result=t1,cid=CID, job="aqfer-imp-aut-pv-hourly", sandbox=SANDBOX, poll_interval=10)
        t3 = create_collate_param(pv=t2)
        t4 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-hourly", result_storage_key=LOCATION).submit(params=t3,cid=CID, job="aqaws-collate-aut-hourly", sandbox=SANDBOX, poll_interval=30)
        t5 = lakeview_run_job_task.with_options(task_run_name="aqaws-collate-aut-daily", result_storage_key=LOCATION).submit(params=p_date,cid=CID, job="aqaws-collate-aut-daily", sandbox=SANDBOX, poll_interval=30,wait_for=[t4])
    else:
        return state
# flow_deploy = flow.from_source(
#         source=GitHub.load("demo-repo"),
#         entrypoint="datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"
if __name__ == "__main__":
    flow.from_source(
        source = GitHubRepository.load("xxxxxxxx"),
        entrypoint = "datalake/cids/c008/flows/integ-test.py:integ_aut_amdp_test"
    ).deploy(
        name = FLOW_NAME,
        tags = [CID],
        version = os.getenv("FLOW_VERSION"),
        work_queue_name = CID,
        work_pool_name = "new-prefect-worker",
    )Nate
09/18/2024, 7:16 PMprefect versionMajo Dencilin
09/18/2024, 7:37 PMMajo Dencilin
09/19/2024, 6:20 AMMajo Dencilin
09/25/2024, 3:13 PMNate
09/25/2024, 4:25 PM