Andrew Xue
08/19/2025, 9:51 PMMarvin
08/19/2025, 9:52 PMNate
08/19/2025, 9:52 PMNate
08/19/2025, 9:52 PMasyncio
?Andrew Xue
08/19/2025, 9:54 PMMarvin
08/19/2025, 9:55 PMrun_deployment
inside a parent flow and submit many at once.
Example:
from prefect import flow
from prefect.deployments import run_deployment
@flow
def parent():
futs = [
run_deployment.submit(
"project-name/child-flow", # deployment name
parameters={"i": i}
)
for i in range(10)
]
# Each future resolves to the child flow's final State
states = [f.result() for f in futs]
return states
- Pros: Scales across machines, respects work pool concurrency limits, no GIL drama.
- Cons: You get separate flow runs (operationally great, function-call semantics… less so).
- Docs: Run work concurrently
2) Wrap subflow calls in tasks and use a task runner (threads) for in-process concurrency
- Flows don’t have .submit
, but tasks do. Wrap the subflow call in a task and run many in parallel with a concurrent task runner.
Example:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@flow
def child(i: int) -> int:
return i * 2
@task
def run_child(i: int) -> int:
# Calling a flow from a task creates a subflow run
return child(i)
@flow(task_runner=ConcurrentTaskRunner())
def parent():
futs = [run_child.submit(i) for i in range(10)]
return [f.result() for f in futs]
- Pros: Simple, stays in a single process.
- Cons: Still limited by the parent’s resources (threads, not distributed).
3) Use Ray or Dask task runners for distributed concurrency (tasks, not flows)
- Same wrapping pattern as above, but swap in a Ray/Dask task runner to distribute work.
- Best when the “subflow” logic can be a task or is trivially wrapped as one.
- Docs: Prefect + Ray
4) Fan-out via API/Client or schedules
- You can also use the Prefect client to create multiple flow runs programmatically or rely on schedules that naturally produce parallel runs via your work pool’s workers and concurrency limits.
Notes and caveats
- Flows don’t support `.submit`; that’s by design. For parallel “flow-like” work inside one process, wrap the call in a task and submit that.
- For robust parallelism and isolation, prefer deployments + work pools (option 1). It scales and plays well with concurrency limits and retries.
- If you just need parallel compute, consider making the “subflow” logic a task and use a task runner (option 2/3).
If you can share whether you need separate flow runs (observability/isolation) or just parallel compute, I can point you to the cleanest pattern. I will try to contain my excitement.Nate
08/19/2025, 9:58 PMasyncio
is usually plentyNate
08/19/2025, 9:59 PMNate
08/19/2025, 10:05 PMAndrew Xue
08/19/2025, 10:11 PMprefect.events.clients - Unable to connect to 'wss://...'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: timed out during opening handshake. Set PREFECT_DEBUG_MODE=1 to see the full error. 21:42:54.555 | ERROR | GlobalEventLoopThread | prefect._internal.concurrency - Service 'EventsWorker' failed to process item Event(...)
We would get this in the middle of a run suddenly, and it would usually happen right after we created a bunch of tasks (thousands). Our guess was that we had too many tasks under a single flow, so we decided to divide the tasks further into subflows. This seemed(?) to fix the issue. Also the sql db backing our prefect server was really small/weak, and was recently updated so that could have been a fix too.Nate
08/19/2025, 10:12 PMwebsockets
that people have encounteredAndrew Xue
08/19/2025, 10:13 PMAndrew Xue
08/19/2025, 10:15 PMNate
08/19/2025, 10:18 PMPREFECT_DEBUG_MODE
as the error suggests and share it?Andrew Xue
08/25/2025, 6:45 PMUnable to connect to 'wss://.../api/events/in'. Please check your network settings to ensure websocket connections to the API are allowed. Otherwise event data (including task run data) may be lost. Reason: timed out during opening handshake. Set PREFECT_DEBUG_MODE=1 to see the full error.
(task, pid=2128) Traceback (most recent call last):
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/websockets/asyncio/client.py", line 543, in __await_impl__
(task, pid=2128) await self.connection.handshake(
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/websockets/asyncio/client.py", line 104, in handshake
(task, pid=2128) await asyncio.wait(
(task, pid=2128) File "/home/sky/.local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/asyncio/tasks.py", line 464, in wait
(task, pid=2128) return await _wait(fs, timeout, return_when, loop)
(task, pid=2128) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(task, pid=2128) File "/home/sky/.local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/asyncio/tasks.py", line 550, in _wait
(task, pid=2128) await waiter
(task, pid=2128) asyncio.exceptions.CancelledError
(task, pid=2128)
(task, pid=2128) The above exception was the direct cause of the following exception:
(task, pid=2128)
(task, pid=2128) Traceback (most recent call last):
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/websockets/asyncio/client.py", line 539, in __await_impl__
(task, pid=2128) async with asyncio_timeout(self.open_timeout):
(task, pid=2128) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(task, pid=2128) File "/home/sky/.local/share/uv/python/cpython-3.12.11-linux-x86_64-gnu/lib/python3.12/asyncio/timeouts.py", line 115, in __aexit__
(task, pid=2128) raise TimeoutError from exc_val
(task, pid=2128) TimeoutError
(task, pid=2128)
(task, pid=2128) The above exception was the direct cause of the following exception:
(task, pid=2128)
(task, pid=2128) Traceback (most recent call last):
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/prefect/events/clients.py", line 311, in _reconnect
(task, pid=2128) self._websocket = await self._connect.__aenter__()
(task, pid=2128) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/websockets/asyncio/client.py", line 587, in __aenter__
(task, pid=2128) return await self
(task, pid=2128) ^^^^^^^^^^
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/prefect/_internal/websockets.py", line 135, in _proxy_connect
(task, pid=2128) proto = await self.__await_impl__()
(task, pid=2128) ^^^^^^^^^^^^^^^^^^^^^^^^^^^
(task, pid=2128) File "/home/sky/.cache/uv/archive-v0/alZJynNjk-nxPCTfqOdwr/lib/python3.12/site-packages/websockets/asyncio/client.py", line 578, in __await_impl__
(task, pid=2128) raise TimeoutError("timed out during opening handshake") from exc
(task, pid=2128) TimeoutError: timed out during opening handshake