John Kang

    John Kang

    2 months ago
    I'm running into an issue where when a flow encounters an error it looks like the asyncio library throws an error. Any idea why?
    19:06:04.832 | ERROR | Flow run 'idealistic-robin' - Encountered exception during execution: Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\engine.py", line 559, in orchestrate_flow_run result = await run_sync(flow_call) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 56, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio\to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 754, in run result = context.run(func, *args) File "C:\Users\JKANG1\AppData\Local\Temp\tmpj8ngzmcx\w_wrapper_update_data.py", line 293, in wrapper_data_update_function NameError: name 'subprocess' is not defined 19:06:04.926 | ERROR | Flow run 'idealistic-robin' - Finished in state Failed('Flow run encountered an exception.') Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_core_sockets.py", line 127, in try_connect stream = await asynclib.connect_tcp(remote_host, remote_port, local_address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 1518, in connect_tcp await get_running_loop().create_connection(StreamProtocol, host, port, File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 1025, in create_connection raise exceptions[0] File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 1010, in create_connection sock = await self._connect_sock( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 924, in _connect_sock await self.sock_connect(sock, address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\proactor_events.py", line 702, in sock_connect return await self._proactor.connect(sock, address) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\windows_events.py", line 812, in _poll value = callback(transferred, key, ov) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\windows_events.py", line 599, in finish_connect ov.getresult() OSError: [WinError 121] The semaphore timeout period has expired The above exception was the direct cause of the following exception:
    Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_exceptions.py", line 8, in map_exceptions yield File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\asyncio.py", line 101, in connect_tcp stream: anyio.abc.ByteStream = await anyio.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_core_sockets.py", line 184, in connect_tcp raise OSError('All connection attempts failed') from cause OSError: All connection attempts failed During handling of the above exception, another exception occurred: Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_transports\default.py", line 60, in map_httpcore_exceptions yield File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_transports\default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_async\connection_pool.py", line 253, in handle_async_request raise exc File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_async\connection_pool.py", line 237, in handle_async_request response = await connection.handle_async_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_async\connection.py", line 86, in handle_async_request raise exc File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_async\connection.py", line 63, in handle_async_request stream = await self._connect(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_async\connection.py", line 111, in _connect stream = await self._network_backend.connect_tcp(**kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\auto.py", line 23, in connect_tcp return await self._backend.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore\backends\asyncio.py", line 101, in connect_tcp stream: anyio.abc.ByteStream = await anyio.connect_tcp( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpcore_exceptions.py", line 12, in map_exceptions raise to_exc(exc) httpcore.ConnectError: All connection attempts failed The above exception was the direct cause of the following exception:
    Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_core_eventloop.py", line 56, in run return asynclib.run(func, *args, **backend_options) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 233, in run return native_run(wrapper(), debug=debug) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\runners.py", line 44, in run return loop.run_until_complete(main) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 616, in run_until_complete return future.result() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 228, in wrapper return await func(*args) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\agent.py", line 105, in start await agent.get_and_submit_flow_runs() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\agent.py", line 109, in get_and_submit_flow_runs submittable_runs = await self.client.get_runs_in_work_queue( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\client.py", line 895, in get_runs_in_work_queue response = await self._client.post( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1820, in post return await self.request( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1506, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\client.py", line 255, in send await super().send(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1593, in send response = await self._send_handling_auth( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1621, in _send_handling_auth response = await self._send_handling_redirects( File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1658, in _send_handling_redirects response = await self._send_single_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_client.py", line 1695, in _send_single_request response = await transport.handle_async_request(request) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_transports\default.py", line 353, in handle_async_request resp = await self._pool.handle_async_request(req) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\contextlib.py", line 131, in exit self.gen.throw(type, value, traceback) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\httpx_transports\default.py", line 77, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ConnectError: All connection attempts failed An exception occurred.
    Anna Geller

    Anna Geller

    1 month ago
    Hi John, a great question for the PAAC course 😜 I believe the latest version doesn't work well on Windows, we'll need to investigate after the GA release until then, can you try in WSL or in Docker?
    John Kang

    John Kang

    1 month ago
    @Anna Geller Thanks for getting back to me! Sorry for the slow response, been a plane to San Jose all morning! I'll try on WSL later today and report back. If I can't figure it out, I'll ask during PACC tomorrow!
    @Anna Geller I've tried the updated method for creating deployments and I'm running into a problem on windows. For some reason even though I've named the flow 'capacity-flow' the deployment is not properly created. Just an FYI (Capacity_venv) C:\Users\JKANG1\PycharmProjects\Manheim_Capacity>prefect deployment build ./main_python_files/w_wrapper_update_data.py:capacity-flow -n capacity-deployment -t test Traceback (most recent call last): File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_core_eventloop.py", line 56, in run return asynclib.run(func, *args, **backend_options) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 233, in run return native_run(wrapper(), debug=debug) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\runners.py", line 44, in run return loop.run_until_complete(main) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\asyncio\base_events.py", line 616, in run_until_complete return future.result() File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\anyio_backends_asyncio.py", line 228, in wrapper return await func(*args) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\cli\deployment.py", line 496, in build flow = load_flow_from_manifest_path(path) File "C:\ProgramData\Anaconda3\envs\Capacity_venv\lib\site-packages\prefect\utilities\importtools.py", line 148, in load_flow_from_manifest_path flow = runpy.run_path(file_path)[obj_name] KeyError: 'capacity-flow' An exception occurred.
    I tried again using WSL (Ubuntu) and it does not work as well... (pacc) johnkangw@CAI-C3T95M3😕mnt/c/Users/JKANG1/PycharmProjects/Manheim_Capacity$ prefect deployment build ./main_python_files/w_wrapper_update_data.py:capacity-flow -n capacity-deployment -t test Traceback (most recent call last): File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/cli/_utilities.py", line 41, in wrapper return fn(*args, **kwargs) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper return run_async_in_new_loop(async_fn, *args, **kwargs) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop return anyio.run(partial(__fn, *args, **kwargs)) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/cli/deployment.py", line 496, in build flow = load_flow_from_manifest_path(path) File "/home/johnkangw/anaconda3/envs/pacc/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 148, in load_flow_from_manifest_path flow = runpy.run_path(file_path)[obj_name] KeyError: 'capacity-flow' An exception occurred.
    @Anna Geller I really preferred using the
    Deployment(
        name="leonardo-deployment",
        flow="./prefect_test.py",
        tags=["tutorial", "test"],
        parameters={"name": "Leo"},
    ).create()
    Method of deployment. I tried but it doesn't seem like this works. I tried the method online and it works only if I don't import my own created python modules. The above method skipped registering all of the dependencies online and worked. Is there something similar in prefect 2.0?
    Anna Geller

    Anna Geller

    1 month ago
    you can create any infra block and save it as shown here https://github.com/anna-geller/prefect-deployment-patterns/blob/main/k8s/customizations/append_commands.py this way you can follow quite similar code-first approach and still be able to modify the block from the UI
    John Kang

    John Kang

    1 month ago
    @Anna Geller Thanks so much! I'll take a look this morning