https://prefect.io logo
Title
p

Paco Ibañez

03/29/2023, 7:06 PM
Hello! I have a docker image that contains several flows and one of them is the one that gets invoked by Prefect as a docker container or k8s Job. How can I call other flows from the first one if the list of flows to run is dynamic (a list of
<prefect.flows.Flow object at ..>
)? when I call the
prefect.flows.Flow
I get a traceback (see thread). Thank you!
1
RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /usr/local/lib/python3.8/site-packages/anyio/from_thread.py:219> cb=[TaskGroup._spawn.<locals>.task_done() at /usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop
z

Zanie

03/29/2023, 7:26 PM
I’d recommend trying the latest version where we no longer use blocking portals
If that doesn’t resolve your issue, I’d be curious to see a minimal example
p

Paco Ibañez

03/29/2023, 7:33 PM
I am on 2.8.7
z

Zanie

03/29/2023, 7:34 PM
Oh interesting
Can you share the full traceback?
p

Paco Ibañez

03/29/2023, 7:35 PM
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 643, in orchestrate_flow_run
    result = await flow_call()
  File "cdp_orchestrator_flow.py", line 55, in cdp_orchestrator
    await orchestrator.run()
  File "/src/pipelines/orchestrator/core/orchestrator.py", line 227, in run
    flow_fn.__call__()
  File "/usr/local/lib/python3.8/site-packages/prefect/flows.py", line 456, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 180, in enter_flow_run_engine_from_flow_call
    return parent_flow_run_context.sync_portal.call(begin_run)
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 283, in call
    return cast(T_Retval, self.start_task_soon(func, *args).result())
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 219, in _call_func
    retval = await retval
  File "/usr/local/lib/python3.8/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 448, in create_and_begin_subflow_run
    parent_task_run = await client.create_task_run(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/orion.py", line 1779, in create_task_run
    response = await <http://self._client.post|self._client.post>(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 229, in send
    response = await self._send_with_retry(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 187, in _send_with_retry
    response = await request()
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /usr/local/lib/python3.8/site-packages/anyio/from_thread.py:219> cb=[TaskGroup._spawn.<locals>.task_done() at /usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:726]> got Future <Future pending> attached to a different loop
z

Zanie

03/29/2023, 7:36 PM
Hm that doesn't look like 2.8.7
p

Paco Ibañez

03/29/2023, 7:37 PM
ohh the docker image may have an older version. server + agent is 2.8.7. Let me rebuild the docker image with the flows
raceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 673, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "cdp_orchestrator_flow.py", line 55, in cdp_orchestrator
    await orchestrator.run()
  File "/src/pipelines/orchestrator/core/orchestrator.py", line 218, in run
    self._step_prefect_info = await self._load_step_prefect_info()
  File "/src/pipelines/orchestrator/core/orchestrator.py", line 152, in _load_step_prefect_info
    deployments = await self.prefect_client.read_deployments(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/orchestration.py", line 1567, in read_deployments
    response = await <http://self._client.post|self._client.post>("/deployments/filter", json=body)
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 243, in send
    response = await self._send_with_retry(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/base.py", line 189, in _send_with_retry
    response = await request()
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
    response = await connection.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
    return await self._connection.handle_async_request(request)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 112, in handle_async_request
    raise exc
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 91, in handle_async_request
    ) = await self._receive_response_headers(**kwargs)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 155, in _receive_response_headers
    event = await self._receive_event(timeout=timeout)
  File "/usr/local/lib/python3.8/site-packages/httpcore/_async/http11.py", line 191, in _receive_event
    data = await self._network_stream.read(
  File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 34, in read
    return await self._stream.receive(max_bytes=max_bytes)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
    await self._protocol.read_event.wait()
  File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
RuntimeError: Task <Task pending name='Task-8' coro=<Call._run_async() running at /usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py:218> cb=[Call.run.<locals>.<lambda>() at /usr/local/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py:155]> got Future <Future pending> attached to a different loop
root@768c17c91454:/src/pipelines/orchestrator/flows# prefect version
Version:             2.8.7
API version:         0.8.4
Python version:      3.8.16
Git commit:          a6d6c6fc
Built:               Thu, Mar 23, 2023 3:27 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         server
actually this traceback is happening before reaching the point where I call the subflow
I have no idea what is going on bc that call was working before. I get the prefect client from the flow_run
from prefect.context import get_run_context
    prefect_client = get_run_context().client
and I use it to gather info about the subflows the current flow needs to call
z

Zanie

03/29/2023, 8:19 PM
Ah you can't use the client from the context in 2.8.7
We're working on fixing that, but it's running in a separate event loop than your tasks / flows now
p

Paco Ibañez

03/29/2023, 8:27 PM
getting the client with
async with get_client() as client:
fixed it. Thank you!
z

Zanie

03/29/2023, 8:27 PM
No problem!