Hey all, I'm in the process of evaluating whether ...
# prefect-community
c
Hey all, I'm in the process of evaluating whether prefect 2.0 is a viable solution to replace an internal tool at work. I've gone through setting up an ECS cluster and have prefect agents listening and able to spin up a dask scheduler and dask cuda worker. However, I'm running into some prefect issues, referenced by someone else here: https://github.com/PrefectHQ/prefect/issues/6759 Does anyone have a workaround for this issue or an ETA on when this will be fixed? I've already tried setting an environment variable in the docker image (PREFECT_API_REQUEST_TIMEOUT=60), but to no avail. Exact error I'm seeing in the dask-cuda-worker in the ๐Ÿงต
Copy code
2022-10-17T15:13:35.587-07:00	22:13:35.586 | ERROR | Task run 'check_nvidia_smi-d00b2eb3-0' - Crash detected! Execution was interrupted by an unexpected exception: Traceback (most recent call last):

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 109, in connect_tcp

2022-10-17T15:13:35.587-07:00	stream: anyio.abc.ByteStream = await anyio.connect_tcp(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/anyio/_core/_sockets.py", line 218, in connect_tcp

2022-10-17T15:13:35.587-07:00	await event.wait()

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__

2022-10-17T15:13:35.587-07:00	raise exceptions[0]

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 733, in task_done

2022-10-17T15:13:35.587-07:00	exc = _task.exception()

2022-10-17T15:13:35.587-07:00	asyncio.exceptions.CancelledError

2022-10-17T15:13:35.587-07:00	During handling of the above exception, another exception occurred:

2022-10-17T15:13:35.587-07:00	Traceback (most recent call last):

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions

2022-10-17T15:13:35.587-07:00	yield

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 109, in connect_tcp

2022-10-17T15:13:35.587-07:00	stream: anyio.abc.ByteStream = await anyio.connect_tcp(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/anyio/_core/_tasks.py", line 118, in __exit__

2022-10-17T15:13:35.587-07:00	raise TimeoutError

2022-10-17T15:13:35.587-07:00	TimeoutError

2022-10-17T15:13:35.587-07:00	During handling of the above exception, another exception occurred:

2022-10-17T15:13:35.587-07:00	Traceback (most recent call last):

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions

2022-10-17T15:13:35.587-07:00	yield

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request

2022-10-17T15:13:35.587-07:00	resp = await self._pool.handle_async_request(req)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request

2022-10-17T15:13:35.587-07:00	raise exc

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request

2022-10-17T15:13:35.587-07:00	response = await connection.handle_async_request(request)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request

2022-10-17T15:13:35.587-07:00	raise exc

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request

2022-10-17T15:13:35.587-07:00	stream = await self._connect(request)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_async/connection.py", line 111, in _connect

2022-10-17T15:13:35.587-07:00	stream = await self._network_backend.connect_tcp(**kwargs)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/backends/auto.py", line 29, in connect_tcp

2022-10-17T15:13:35.587-07:00	return await self._backend.connect_tcp(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 109, in connect_tcp

2022-10-17T15:13:35.587-07:00	stream: anyio.abc.ByteStream = await anyio.connect_tcp(

2022-10-17T15:13:35.587-07:00	File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__

2022-10-17T15:13:35.587-07:00	self.gen.throw(type, value, traceback)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions

2022-10-17T15:13:35.587-07:00	raise to_exc(exc)

2022-10-17T15:13:35.587-07:00	httpcore.ConnectTimeout

2022-10-17T15:13:35.587-07:00	The above exception was the direct cause of the following exception:

2022-10-17T15:13:35.587-07:00	Traceback (most recent call last):

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/prefect/client/orion.py", line 177, in api_healthcheck

2022-10-17T15:13:35.587-07:00	await self._client.get("/health")

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1751, in get

2022-10-17T15:13:35.587-07:00	return await self.request(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request

2022-10-17T15:13:35.587-07:00	return await self.send(request, auth=auth, follow_redirects=follow_redirects)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/prefect/client/base.py", line 159, in send

2022-10-17T15:13:35.587-07:00	await super().send(*args, **kwargs)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1614, in send

2022-10-17T15:13:35.587-07:00	response = await self._send_handling_auth(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1642, in _send_handling_auth

2022-10-17T15:13:35.587-07:00	response = await self._send_handling_redirects(

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects

2022-10-17T15:13:35.587-07:00	response = await self._send_single_request(request)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_client.py", line 1716, in _send_single_request

2022-10-17T15:13:35.587-07:00	response = await transport.handle_async_request(request)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request

2022-10-17T15:13:35.587-07:00	resp = await self._pool.handle_async_request(req)

2022-10-17T15:13:35.587-07:00	File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__

2022-10-17T15:13:35.587-07:00	self.gen.throw(type, value, traceback)

2022-10-17T15:13:35.587-07:00	File "/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions

2022-10-17T15:13:35.587-07:00	raise mapped_exc(message) from exc

2022-10-17T15:13:35.587-07:00	httpx.ConnectTimeout

2022-10-17T15:13:35.587-07:00	The above exception was the direct cause of the following exception:

2022-10-17T15:13:35.587-07:00	RuntimeError: Cannot orchestrate task run 'e7534732-1e9c-4c1d-8335-75ce3edca177'. Failed to connect to API at <https://api.prefect.cloud/>....
This is using prefect==2.6.1, a dask task runner on ecs, and a prefect cloud account
m
Hey @Cameron Chafetz I'm assuming so but do you have the prefect api key set in the profile configuration as well? If you type
prefect config view
do see if you saw something like this?
Copy code
PREFECT_PROFILE='test_cloud'
PREFECT_API_KEY='xxx_XXXXXXXXXXXXXXXXXXXXXXXXX' (from profile)
PREFECT_API_URL='<https://api.prefect.cloud/api/accounts/XXX/workspaces/XXX>' (from profile)
c
Does this need to be on the worker + scheduler, as well as the agent?
that's definitely set up for the agent, but not on the ecs tasks running the actual work
m
A work queue will include a CLI command that will allow a specific agent to pull flow runs from said work queue. I included some documentation below: https://docs.prefect.io/ui/work-queues/. It is also worth creating a new API key to ensure the current API key you are using is not expired
c
oh my agent has the necessary environment variables to pull from the work queue
the agent then spins up a dask scheduler and then spins up a corresponding dask worker
the dask worker is the one generating the above error
I can try to add the api key and url to the dask scheduler + worker to see if that fares any better
m
Thanks @Cameron Chafetz I will also collaborate with a few of my colleagues and see if we can find a solution. Would you please be able to post a minimal example of your code in the thread as well if this doesn't work?
๐Ÿ‘ 1
c
Still hitting the connect timeout unfortunately. I'm not sure how feasible this will be to reproduce without creating an ecs cluster, but
Copy code
@flow(name="Testing Dask GPU",
      task_runner=DaskTaskRunner(cluster_class=lambda *args, **kwargs: ECSCluster(cluster_arn="arn:aws:ecs:us-west-1:<account>:cluster/test-dask-ecs", n_workers=1, security_groups=['<sg id>'],  worker_gpu=1,
                                                                  scheduler_timeout="25 minutes", task_role_arn="arn:aws:iam::<account>:role/dask-task-role",
                                                                  execution_role_arn="arn:aws:iam::<account>:role/dask-execution-role",
                                                                  image="<account>.<http://dkr.ecr.us-west-1.amazonaws.com/test-prefect-gpu:test|dkr.ecr.us-west-1.amazonaws.com/test-prefect-gpu:test>",
                                                                  environment={"PREFECT_API_REQUEST_TIMEOUT": "60"})))
def gpu_flow():
    logger = get_run_logger()
    logger.warning("About to do some gpu stuff")
    check_nvidia_smi.submit()
    logger.warning(f"Finished doing gpu stuff!")


@task
def check_nvidia_smi():
    logger = get_run_logger()
    logger.warning("In check nvidia smi!")
    result = subprocess.run(["nvidia-smi"], capture_output=True, text=True, shell=True)
    logger.warning(f"Did some gpu stuff with output {result}")
This is the flow I build and deploy. I build using:
prefect deployment build ./basic_flow.py:gpu_flow -n test-dask-gpu -q test -sb s3/<my bucket>
and deploy using:
prefect deployment apply gpu_flow-deployment.yaml
I then run the flow from the prefect cloud UI using default params (none), and switch to the ECS console. I watch the scheduler spin up, and then the worker spin up. Then both come down, and when I check the logs, I see the above logs in the worker's logs
m
@Cameron Chafetz I think that this is the same issue that you pointed to at the beginning of our thread, our team has been looking into this and I would recommend checking out that issue for further progress