Sang Young Noh
05/26/2022, 1:53 PMKevin Kho
05/26/2022, 2:51 PMSang Young Noh
05/26/2022, 2:52 PMflow(name = 'ng', task_runner = DaskTaskRunner(address ="localhost:8786"))
def ng_daily_polling(MaxLate=datetime.timedelta(hours=12)):
"""Temporary function to replace the bottom NG daily polling function
Args:
timezone (str): Timezone for scheduling. Defaults to "Europe/London"
description (str): More detailed decription for the particular flow does.
Defaults to 'Lorem Ipsum'
Returns:
PH
Raises:
Skipping
"""
# Variables that need to be used later
# max_late = datetime.timedelta(hours=12)
# now = datetime.datetime.now(tz=pytz.utc)
ctx: FlowRunContext = get_run_context()
logger = get_run_logger()
<http://logger.info|logger.info>("INFO level log message.")
But when I look at the docker container for the dask, I seem to be getting the following error:
dask-worker_2 | Traceback (most recent call last):
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
dask-worker_2 | await <http://self.app|self.app>(scope, receive, _send)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 93, in __call__
dask-worker_2 | raise exc
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
dask-worker_2 | await <http://self.app|self.app>(scope, receive, sender)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
dask-worker_2 | raise e
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
dask-worker_2 | await <http://self.app|self.app>(scope, receive, send)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 670, in __call__
dask-worker_2 | await route.handle(scope, receive, send)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 266, in handle
dask-worker_2 | await <http://self.app|self.app>(scope, receive, send)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 65, in app
dask-worker_2 | response = await func(request)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
dask-worker_2 | response = await default_handler(request)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 227, in app
dask-worker_2 | raw_response = await run_endpoint_function(
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
dask-worker_2 | return await dependant.call(**values)
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/prefect/orion/api/task_runs.py", line 188, in set_task_run_state
dask-worker_2 | orchestration_result = await models.task_runs.set_task_run_state(
dask-worker_2 | File "/usr/local/lib/python3.8/site-packages/prefect/orion/models/task_runs.py", line 282, in set_task_run_state
dask-worker_2 | raise ValueError(f"Invalid task run: {task_run_id}")
dask-worker_2 | ValueError: Invalid task run: bfaccc7a-e983-4780-be34-8af27802ebe4
dask-worker_2 | 2022-05-26 13:42:28,502 - distributed.worker - WARNING - Compute Failed
dask-worker_2 | Key: Extraction-227e78f8-0-bfaccc7ae9834780be348af27802ebe4
dask-worker_2 | Function: begin_task_run
dask-worker_2 | args: ()
dask-worker_2 | kwargs: {'task': <prefect.tasks.Task object at 0xffff9fd5beb0>, 'task_run': TaskRun(id=UUID('bfaccc7a-e983-4780-be34-8af27802ebe4'), name='Extraction-227e78f8-0', flow_run_id=UUID('b0643545-7761-41a6-8c1e-592b5a03da1c'), task_key='227e78f8a94bc7d03dbd490e701bda55', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('db026d63-a619-4bb7-bb42-44caaf143993'), task_inputs={'start': [], 'end': [], 'max_retries': [], 'retry_delay': []}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 5, 26, 13, 42, 28, 268661, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=20339), state=Pending(message=None, type=PENDING, result=None, task_run_id=bfaccc7a-e983-4780-be34-8af27802
dask-worker_2 | Exception: "ValueError('Invalid task run: bfaccc7a-e983-4780-be34-8af27802ebe4')"
Kevin Kho
05/26/2022, 2:53 PMSang Young Noh
05/26/2022, 3:00 PMKevin Kho
05/26/2022, 3:01 PMSang Young Noh
05/26/2022, 5:16 PM