https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Sang Young Noh

05/26/2022, 1:53 PM
Hello all I’m currently trying to run a code with a dask runner - I’ve set up a dask scheduler and client and tried running with something like the following code for the flow: I was just wondering if anyone was familiar with this error? Thanks
k

Kevin Kho

05/26/2022, 2:51 PM
Hi @Sang Young Noh, this is because your workers can’t connect to the database. See this post for options. When you get a chance, could you move the flow and traceback to the thread to keep the main channel cleaner?
s

Sang Young Noh

05/26/2022, 2:52 PM
Sure I’ll move it now
Copy code
flow(name = 'ng', task_runner = DaskTaskRunner(address ="localhost:8786"))
def ng_daily_polling(MaxLate=datetime.timedelta(hours=12)):
    """Temporary function to replace the bottom NG daily polling function                                                                                                                                             
                                                                                                                                                                                                                      
    Args:                                                                                                                                                                                                             
        timezone (str): Timezone for scheduling. Defaults to "Europe/London"                                                                                                                                          
        description (str): More detailed decription for the particular flow does.                                                                                                                                     
                          Defaults to 'Lorem Ipsum'                                                                                                                                                                   
    Returns:                                                                                                                                                                                                          
        PH                                                                                                                                                                                                            
    Raises:                                                                                                                                                                                                           
        Skipping                                                                                                                                                                                                      
    """

    # Variables that need to be used later                                                                                                                                                                            
    # max_late = datetime.timedelta(hours=12)                                                                                                                                                                         
    # now = datetime.datetime.now(tz=pytz.utc)                                                                                                                                                                        

    ctx: FlowRunContext = get_run_context()

    logger = get_run_logger()

    <http://logger.info|logger.info>("INFO level log message.")
But when I look at the docker container for the dask, I seem to be getting the following error:
Copy code
dask-worker_2         | Traceback (most recent call last):
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
dask-worker_2         |     await <http://self.app|self.app>(scope, receive, _send)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 93, in __call__
dask-worker_2         |     raise exc
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
dask-worker_2         |     await <http://self.app|self.app>(scope, receive, sender)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
dask-worker_2         |     raise e
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
dask-worker_2         |     await <http://self.app|self.app>(scope, receive, send)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 670, in __call__
dask-worker_2         |     await route.handle(scope, receive, send)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 266, in handle
dask-worker_2         |     await <http://self.app|self.app>(scope, receive, send)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 65, in app
dask-worker_2         |     response = await func(request)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
dask-worker_2         |     response = await default_handler(request)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 227, in app
dask-worker_2         |     raw_response = await run_endpoint_function(
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
dask-worker_2         |     return await dependant.call(**values)
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/prefect/orion/api/task_runs.py", line 188, in set_task_run_state
dask-worker_2         |     orchestration_result = await models.task_runs.set_task_run_state(
dask-worker_2         |   File "/usr/local/lib/python3.8/site-packages/prefect/orion/models/task_runs.py", line 282, in set_task_run_state
dask-worker_2         |     raise ValueError(f"Invalid task run: {task_run_id}")
dask-worker_2         | ValueError: Invalid task run: bfaccc7a-e983-4780-be34-8af27802ebe4
dask-worker_2         | 2022-05-26 13:42:28,502 - distributed.worker - WARNING - Compute Failed
dask-worker_2         | Key:       Extraction-227e78f8-0-bfaccc7ae9834780be348af27802ebe4
dask-worker_2         | Function:  begin_task_run
dask-worker_2         | args:      ()
dask-worker_2         | kwargs:    {'task': <prefect.tasks.Task object at 0xffff9fd5beb0>, 'task_run': TaskRun(id=UUID('bfaccc7a-e983-4780-be34-8af27802ebe4'), name='Extraction-227e78f8-0', flow_run_id=UUID('b0643545-7761-41a6-8c1e-592b5a03da1c'), task_key='227e78f8a94bc7d03dbd490e701bda55', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('db026d63-a619-4bb7-bb42-44caaf143993'), task_inputs={'start': [], 'end': [], 'max_retries': [], 'retry_delay': []}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 5, 26, 13, 42, 28, 268661, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=20339), state=Pending(message=None, type=PENDING, result=None, task_run_id=bfaccc7a-e983-4780-be34-8af27802
dask-worker_2         | Exception: "ValueError('Invalid task run: bfaccc7a-e983-4780-be34-8af27802ebe4')"
k

Kevin Kho

05/26/2022, 2:53 PM
Thank you!
s

Sang Young Noh

05/26/2022, 3:00 PM
Is there a crash course tutorial on this procedure? Sorry again but any further pointers on this would be much apprecaited
k

Kevin Kho

05/26/2022, 3:01 PM
For using Cloud 2.0, the docs are here. That will be the easiest and it’s free for now (and will have a generous free tier also)
s

Sang Young Noh

05/26/2022, 5:16 PM
Thank you! I’m sure once its out of beta we’d be happy to sign up for prefect 2 cloud
6 Views