Andreas Nigg
07/12/2022, 8:41 AM@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return response
The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception.
The get request in the task takes about 1 minute and 10 seconds to return.
The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect.
Exception summary:
• requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))
• followed by: 103655.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out.
Exception details: See in thread
Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute?
Edit: I run the flow currently only locally by running "python name_of_script.py"
Edit2: I'm running the python env in WSL2
Edit3: I use GCS storage as my default storage. Maybe this causes the problem?
Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute.
I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return zlib.compress(response.content)
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 520, in orchestrate_flow_run
result = await run_sync_in_interruptible_worker_thread(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "/home/andreas/github/datawarehouse/prefect-data-integrations/kayak/kayak_integration_flows.py", line 122, in integrate_kayak_subscriptions
print(subs_response.result())
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/futures.py", line 209, in result
return sync(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 136, in result
raise data
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/task_runners.py", line 212, in submit
result = await run_fn(**run_kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 793, in begin_task_run
return await orchestrate_task_run(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 927, in orchestrate_task_run
await client.persist_data(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 1538, in persist_data
storage_token = await block.write(data)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/blocks/storage.py", line 309, in write
await run_sync_in_worker_thread(upload)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2844, in upload_from_string
self.upload_from_file(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2571, in upload_from_file
created_json = self._do_upload(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2388, in _do_upload
response = self._do_resumable_upload(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2232, in _do_resumable_upload
response = upload.transmit_next_chunk(transport, timeout=timeout)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/upload.py", line 515, in transmit_next_chunk
return _request_helpers.wait_and_retry(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/_request_helpers.py", line 171, in wait_and_retry
raise error
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/_request_helpers.py", line 148, in wait_and_retry
response = func()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/upload.py", line 507, in retriable_request
result = transport.request(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/auth/transport/requests.py", line 549, in request
response = super(AuthorizedSession, self).request(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))
(most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
await self._stream.send(item=buffer)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/streams/tls.py", line 202, in send
await self._call_sslobject_method(self._ssl_object.write, item)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/streams/tls.py", line 168, in _call_sslobject_method
await self.transport_stream.send(self._write_bio.read())
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1307, in send
await self._protocol.write_event.wait()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/locks.py", line 226, in wait
await fut
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
yield
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
await self._stream.send(item=buffer)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
raise TimeoutError
TimeoutError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
yield
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
raise exc
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 75, in handle_async_request
await self._send_request_body(**kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 128, in _send_request_body
await self._send_event(event, timeout=timeout)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 137, in _send_event
await self._network_stream.write(bytes_to_send, timeout=timeout)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
await self._stream.send(item=buffer)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
raise to_exc(exc)
httpcore.WriteTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/andreas/github/datawarehouse/prefect-data-integrations/kayak/kayak_integration_flows.py", line 211, in <module>
integrate_kayak_subscriptions()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/flows.py", line 370, in __call__
return enter_flow_run_engine_from_flow_call(self, parameters)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 128, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
return await fn(*args, **kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 192, in create_then_begin_flow_run
return await begin_flow_run(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 314, in begin_flow_run
terminal_state = await orchestrate_flow_run(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 199, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 1023, in report_flow_run_crashes
await client.set_flow_run_state(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 1642, in set_flow_run_state
response = await <http://self._client.post|self._client.post>(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 258, in send
await super().send(*args, **kwargs)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.WriteTimeout
Anna Geller
I'd be happy to know, if my "theory" about GCS being the problem is correct.I believe your theory is correct. Currently, the default mode of handling task run results is to store those as cloudpickled files in the default storage. I believe we may improve the process of what happens when the results cannot be stored due to issues connecting to remote storage. Thanks for reporting that and nice work finding a workaround! I'll open an issue to investigate a more robust approach @Marvin open "Improve timeout logic for handling issues when task run results cannot be written to a remote storage"
Marvin
07/12/2022, 11:50 AM