Hi all, I've encountered a prefect 2.0 (cloud) pro...
# prefect-community
a
Hi all, I've encountered a prefect 2.0 (cloud) problem: I've a simple a flow which has a single task which looks as follows:
Copy code
@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
    response = requests.get("my_url")
    return response
The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception. The get request in the task takes about 1 minute and 10 seconds to return. The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect. Exception summary: • requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))followed by: 103655.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out. Exception details: See in thread Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute? Edit: I run the flow currently only locally by running "python name_of_script.py" Edit2: I'm running the python env in WSL2 Edit3: I use GCS storage as my default storage. Maybe this causes the problem? Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute. I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
Copy code
@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
    response = requests.get("my_url")
    return zlib.compress(response.content)
1
Copy code
Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 520, in orchestrate_flow_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/home/andreas/github/datawarehouse/prefect-data-integrations/kayak/kayak_integration_flows.py", line 122, in integrate_kayak_subscriptions
    print(subs_response.result())
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/futures.py", line 209, in result
    return sync(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 221, in sync
    return run_async_from_worker_thread(__async_fn, *args, **kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 136, in result
    raise data
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/task_runners.py", line 212, in submit
    result = await run_fn(**run_kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 793, in begin_task_run
    return await orchestrate_task_run(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 927, in orchestrate_task_run
    await client.persist_data(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 1538, in persist_data
    storage_token = await block.write(data)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/blocks/storage.py", line 309, in write
    await run_sync_in_worker_thread(upload)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2844, in upload_from_string
    self.upload_from_file(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2571, in upload_from_file
    created_json = self._do_upload(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2388, in _do_upload
    response = self._do_resumable_upload(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/cloud/storage/blob.py", line 2232, in _do_resumable_upload
    response = upload.transmit_next_chunk(transport, timeout=timeout)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/upload.py", line 515, in transmit_next_chunk
    return _request_helpers.wait_and_retry(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/_request_helpers.py", line 171, in wait_and_retry
    raise error
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/_request_helpers.py", line 148, in wait_and_retry
    response = func()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/resumable_media/requests/upload.py", line 507, in retriable_request
    result = transport.request(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/google/auth/transport/requests.py", line 549, in request
    response = super(AuthorizedSession, self).request(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))
Copy code
(most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
    await self._stream.send(item=buffer)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/streams/tls.py", line 202, in send
    await self._call_sslobject_method(self._ssl_object.write, item)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/streams/tls.py", line 168, in _call_sslobject_method
    await self.transport_stream.send(self._write_bio.read())
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 1307, in send
    await self._protocol.write_event.wait()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 8, in map_exceptions
    yield
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
    await self._stream.send(item=buffer)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 60, in map_httpcore_exceptions
    yield
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
    raise exc
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 75, in handle_async_request
    await self._send_request_body(**kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 128, in _send_request_body
    await self._send_event(event, timeout=timeout)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_async/http11.py", line 137, in _send_event
    await self._network_stream.write(bytes_to_send, timeout=timeout)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/backends/asyncio.py", line 49, in write
    await self._stream.send(item=buffer)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpcore/_exceptions.py", line 12, in map_exceptions
    raise to_exc(exc)
httpcore.WriteTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/andreas/github/datawarehouse/prefect-data-integrations/kayak/kayak_integration_flows.py", line 211, in <module>
    integrate_kayak_subscriptions()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/flows.py", line 370, in __call__
    return enter_flow_run_engine_from_flow_call(self, parameters)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 128, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 192, in create_then_begin_flow_run
    return await begin_flow_run(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 314, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 199, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/engine.py", line 1023, in report_flow_run_crashes
    await client.set_flow_run_state(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 1642, in set_flow_run_state
    response = await <http://self._client.post|self._client.post>(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/prefect/client.py", line 258, in send
    await super().send(*args, **kwargs)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/contextlib.py", line 137, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/andreas/miniconda3/envs/python_prefect_test/lib/python3.9/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.WriteTimeout
a
I'd be happy to know, if my "theory" about GCS being the problem is correct.
I believe your theory is correct. Currently, the default mode of handling task run results is to store those as cloudpickled files in the default storage. I believe we may improve the process of what happens when the results cannot be stored due to issues connecting to remote storage. Thanks for reporting that and nice work finding a workaround! I'll open an issue to investigate a more robust approach @Marvin open "Improve timeout logic for handling issues when task run results cannot be written to a remote storage"