https://prefect.io logo
Title
j

John Kang

07/26/2022, 11:32 PM
I'm running into an issue where when a flow encounters an error it looks like the asyncio library throws an error. Any idea why?
1
19:06:04.832 | ERROR | Flow run 'idealistic-robin' - Encountered exception during execution: Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 559, in orchestrate_flow_run result = await run_sync(flow_call) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 56, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 754, in run result = context.run(func, *args) File "C:\Users\JKANG1\AppData\Local\Temp\tmpj8ngzmcx\w_wrapper_update_data.py", line 293, in wrapper_data_update_function NameError: name 'subprocess' is not defined 19:06:04.926 | ERROR | Flow run 'idealistic-robin' - Finished in state Failed('Flow run encountered an exception.') Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_core\_sockets.py", line 127, in try_connect stream = await asynclib.connect_tcp(remote_host, remote_port, local_address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 1518, in connect_tcp await get_running_loop().create_connection(StreamProtocol, host, port, File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 1025, in create_connection raise exceptions[0] File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 1010, in create_connection sock = await self._connect_sock( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 924, in _connect_sock await self.sock_connect(sock, address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\proactor_events.py", line 702, in sock_connect return await self._proactor.connect(sock, address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\windows_events.py", line 812, in _poll value = callback(transferred, key, ov) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\windows_events.py", line 599, in finish_connect ov.getresult() OSError: [WinError 121] The semaphore timeout period has expired The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_exceptions.py", line 8, in map_exceptions yield File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\asyncio.py", line 101, in connect_tcp stream: anyio.abc.ByteStream = await anyio.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_core\_sockets.py", line 184, in connect_tcp raise OSError('All connection attempts failed') from cause OSError: All connection attempts failed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_transports\default.py", line 60, in map_httpcore_exceptions yield File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_transports\default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_async\connection_pool.py", line 253, in handle_async_request raise exc File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_async\connection_pool.py", line 237, in handle_async_request response = await connection.handle_async_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_async\connection.py", line 86, in handle_async_request raise exc File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_async\connection.py", line 63, in handle_async_request stream = await self._connect(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_async\connection.py", line 111, in _connect stream = await self._network_backend.connect_tcp(**kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\auto.py", line 23, in connect_tcp return await self._backend.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\asyncio.py", line 101, in connect_tcp stream: anyio.abc.ByteStream = await anyio.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\_exceptions.py", line 12, in map_exceptions raise to_exc(exc) httpcore.ConnectError: All connection attempts failed The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_core\_eventloop.py", line 56, in run return asynclib.run(func, *args, **backend_options) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 233, in run return native_run(wrapper(), debug=debug) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\runners.py", line 44, in run return loop.run_until_complete(main) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 616, in run_until_complete return future.result() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 228, in wrapper return await func(*args) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\agent.py", line 105, in start await agent.get_and_submit_flow_runs() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\agent.py", line 109, in get_and_submit_flow_runs submittable_runs = await self.client.get_runs_in_work_queue( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\client.py", line 895, in get_runs_in_work_queue response = await self._client.post( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1820, in post return await self.request( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1506, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\client.py", line 255, in send await super().send(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1593, in send response = await self._send_handling_auth( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1621, in _send_handling_auth response = await self._send_handling_redirects( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1658, in _send_handling_redirects response = await self._send_single_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_client.py", line 1695, in _send_single_request response = await transport.handle_async_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_transports\default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx\_transports\default.py", line 77, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ConnectError: All connection attempts failed An exception occurred.
a

Anna Geller

07/27/2022, 12:08 PM
Hi John, a great question for the PAAC course 😜 I believe the latest version doesn't work well on Windows, we'll need to investigate after the GA release until then, can you try in WSL or in Docker?
j

John Kang

07/27/2022, 8:21 PM
@Anna Geller Thanks for getting back to me! Sorry for the slow response, been a plane to San Jose all morning! I'll try on WSL later today and report back. If I can't figure it out, I'll ask during PACC tomorrow!
🙌 1
@Anna Geller I've tried the updated method for creating deployments and I'm running into a problem on windows. For some reason even though I've named the flow 'capacity-flow' the deployment is not properly created. Just an FYI (Capacity_venv) C:\Users\JKANG1\PycharmProjects\Manheim_Capacity>prefect deployment build ./main_python_files/w_wrapper_update_data.py:capacity-flow -n capacity-deployment -t test Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_core\_eventloop.py", line 56, in run return asynclib.run(func, *args, **backend_options) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 233, in run return native_run(wrapper(), debug=debug) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\runners.py", line 44, in run return loop.run_until_complete(main) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 616, in run_until_complete return future.result() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\_backends\_asyncio.py", line 228, in wrapper return await func(*args) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\deployment.py", line 496, in build flow = load_flow_from_manifest_path(path) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\importtools.py", line 148, in load_flow_from_manifest_path flow = runpy.run_path(file_path)[obj_name] KeyError: 'capacity-flow' An exception occurred.
I tried again using WSL (Ubuntu) and it does not work as well... (pacc) johnkangw@CAI-C3T95M3:/mnt/c/Users/JKANG1/PycharmProjects/Manheim_Capacity$ prefect deployment build ./main_python_files/w_wrapper_update_data.py:capacity-flow -n capacity-deployment -t test Traceback (most recent call last): File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/cli/deployment.py", line 496, in build flow = load_flow_from_manifest_path(path) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 148, in load_flow_from_manifest_path flow = runpy.run_path(file_path)[obj_name] KeyError: 'capacity-flow' An exception occurred.
@Anna Geller I really preferred using the
Deployment(
    name="leonardo-deployment",
    flow="./prefect_test.py",
    tags=["tutorial", "test"],
    parameters={"name": "Leo"},
).create()
Method of deployment. I tried but it doesn't seem like this works. I tried the method online and it works only if I don't import my own created python modules. The above method skipped registering all of the dependencies online and worked. Is there something similar in prefect 2.0?
a

Anna Geller

07/28/2022, 12:25 PM
you can create any infra block and save it as shown here https://github.com/anna-geller/prefect-deployment-patterns/blob/main/k8s/customizations/append_commands.py this way you can follow quite similar code-first approach and still be able to modify the block from the UI
j

John Kang

07/28/2022, 3:27 PM
@Anna Geller Thanks so much! I'll take a look this morning