https://prefect.io logo
Title
o

Oscar Björhn

03/24/2023, 12:49 PM
A few of my flows that used to work in 2.8.6 (as well as previous versions) fail in 2.8.7 with concurrency-related errors. Should this be reported or are the changes expected to break some existing flows?
Exceptions here:
Encountered exception during execution:
Traceback (most recent call last):
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/prefect/engine.py", line 1545, in orchestrate_task_run
    result = await call.aresult()
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "orchestration/flows/raw_to_harmonized.py", line 41, in preprocess_task
    return await preprocess(
  File "/opt/prefect/pipeline/pipeline/raw_to_harmonized.py", line 66, in preprocess
    await snowflake_client.configure_catalog_collection()
  File "/opt/prefect/pipeline/pipeline/clients/snowflake_client.py", line 300, in configure_catalog_collection
    catalog_raw = await self.load_catalog(scope=scope)
  File "/opt/prefect/pipeline/pipeline/clients/snowflake_client.py", line 321, in load_catalog
    if await blob_client.exists():
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py", line 79, in wrapper_use_tracer
    return await func(*args, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py", line 618, in exists
    await self._client.blob.get_properties(
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py", line 79, in wrapper_use_tracer
    return await func(*args, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py", line 375, in get_properties
    pipeline_response = await self._client._pipeline.run(  # type: ignore # pylint: disable=protected-access
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 215, in run
    return await first_node.send(pipeline_request)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  [Previous line repeated 3 more times]
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication_async.py", line 73, in send
    await await_result(self.on_request, request)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_tools_async.py", line 36, in await_result
    return await result  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/policies/_authentication_async.py", line 51, in on_request
    self._token = await self._credential.get_token(*self._scopes)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_credentials/default.py", line 144, in get_token
    return await self._successful_credential.get_token(*scopes, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_internal/decorators.py", line 21, in wrapper
    token = await fn(*args, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_credentials/managed_identity.py", line 121, in get_token
    return await self._credential.get_token(*scopes, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_internal/get_token_mixin.py", line 66, in get_token
    token = await self._request_token(*scopes, **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_credentials/imds.py", line 67, in _request_token
    token = await self._client.request_token(*scopes, headers={"Metadata": "true"})
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/identity/aio/_internal/managed_identity_client.py", line 36, in request_token
    response = await self._pipeline.run(request, retry_on_methods=[request.method], **kwargs)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 215, in run
    return await first_node.send(pipeline_request)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  [Previous line repeated 1 more time]
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/policies/_retry_async.py", line 147, in send
    response = await self.next.send(request)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 83, in send
    response = await self.next.send(request)  # type: ignore
  [Previous line repeated 1 more time]
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/_base_async.py", line 116, in send
    await self._sender.send(request.http_request, **request.context.options),
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/azure/core/pipeline/transport/_aiohttp.py", line 229, in send
    result = await self.session.request(    # type: ignore
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/aiohttp/client.py", line 560, in _request
    await resp.start(conn)
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 899, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
  File "/root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/aiohttp/streams.py", line 616, in read
    await self._waiter
RuntimeError: Task <Task pending name='Task-44' coro=<Call._run_async() running at /root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.9/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
01:45:55 PM
Preprocess Entities-0
prefect.task_runs
Finished in state Failed("Task run encountered an exception: RuntimeError: Task <Task pending name='Task-44' coro=<Call._run_async() running at /root/.pyenv/versions/3.8.9/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[_run_until_complete_cb() at /root/.pyenv/versions/3.8.9/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop\n")
01:45:56 PM
Preprocess Entities-0
prefect.task_runs
Future exception was never retrieved
future: <Future finished exception=ServerDisconnectedError('Server disconnected')>
aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected
01:45:56 PM
Preprocess Entities-0
asyncio
z

Zanie

03/24/2023, 2:37 PM
The intent was for there to be no breaking changes, but we understand that there are some cases where running things on different threads would cause problems.
If you can produce an MRE I’d happily either fix the problem or give suggestions on how you should change your code.
o

Oscar Björhn

03/24/2023, 2:40 PM
That sounds great, thank you! I am currently running all flows in 2.8.7 so I can narrow down exactly which ones aren't working. Most seem fine. After that, I'll try to put together an MRE.
z

Zanie

03/24/2023, 2:42 PM
It looks like maybe you’re sharing an async client across tasks that are sync? or tasks that are async in a sync flow?
o

Oscar Björhn

03/24/2023, 2:43 PM
It's quite possible. So far I've isolated the problem to our more uh.. "creative" flows, that fire up hundreds of ACI (Azure Container Instance) jobs. And the engineer who wrote these flows just left on a 1-month vacation today.. 😅
I do see a few sync tasks that receive an object of type SnowflakeClient (something we built, not an official Snowflake class). It likely does a lot of async stuff, yeah..
z

Zanie

03/24/2023, 2:47 PM
In the traceback it looks like the Azure client
o

Oscar Björhn

03/24/2023, 2:47 PM
Oh, yeah that's probably it.
Right, had some time to dig into it further. This is, in fact, the only flow with issues. As far as I can tell, it's an async flow calling async functions all the way down to the exception. Might be missing something though.
Producing an MRE will probably be difficult, it's a complex flow with very complex classes. Will likely roll back to 2.8.6 until the engineer who write this has returned from vacation. Appreciate the help! Wish I could dig further but don't have the time to allocate right now, as long as 2.8.6 keeps working.. 🙂
z

Zanie

03/24/2023, 4:23 PM
Hm I think that we’re not scheduling tasks back on the main thread where feasible yet so that’s probably it even if you have an async task in an async flow
I’ll look into a solution for that, thanks for raising!
o

Oscar Björhn

03/24/2023, 4:26 PM
No problem! If there is something specific you'd like me to test or if you get some code in a branch I don't mind giving that a go. I've "patched" local prefect installs with PR branches before.
z

Zanie

03/24/2023, 4:28 PM
Sweet!