Jon Mason
09/05/2025, 7:54 PMtask.map
- for example,
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
print(f'check_inner({obj})')
return obj[1] == 'b'
@task(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
print(f'check_outer({bucket}, {keys})...')
result = check_inner.map([(bucket, k) for k in keys]).result()
print(f'check_outer({bucket}, {keys}) -> {result}')
return result
@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
keys1 = keys.copy()
keys2 = keys.copy()
out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
print(f'out={out}')
if __name__ == '__main__':
example_flow(['a', 'b', 'c'], 'some-bucket')
However, I've also seen stuff online indicating that you can't have call map from inside a task. Interestingly when I run this flow locally it works great. However, when I run it in an ECS prefect3 deployment the tasks hang forever and produce no logs. I've already done a whole thing to make sure that websockets can talk to my server, so I don't think it's that. Could something be timing out? Or is it true that you can't do nested tasks maps like this.Marvin
09/05/2025, 7:54 PMNate
09/05/2025, 7:54 PMI've also seen stuff online indicating that you can't have call map from inside a taskyou can indeed nest map calls
Nate
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:55 PMNate
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:56 PMNate
09/05/2025, 7:56 PMNate
09/05/2025, 7:57 PMJon Mason
09/05/2025, 7:57 PMCreated task run 'check_outer-2e7' for task 'check_outer'
Executing task 'check_outer' for task run 'check_outer-2e7'...
Jon Mason
09/05/2025, 7:57 PMNate
09/05/2025, 7:57 PMJon Mason
09/05/2025, 7:57 PMcheck_outer
task hangs, I don't see any runs for check_inner
Jon Mason
09/05/2025, 7:58 PMJon Mason
09/05/2025, 7:58 PMMarvin
09/05/2025, 7:59 PMJon Mason
09/05/2025, 7:59 PMMarvin
09/05/2025, 8:01 PMtask.map()
from inside a task, but it’s easy to deadlock if the task that’s doing the mapping blocks on .result()
while holding the only available worker thread/CPU.
- What you’re seeing in ECS is very likely a resource deadlock: the “outer” task is blocking on .result()
while the Dask cluster doesn’t have spare capacity to run the “inner” mapped tasks, so nothing advances and you see no logs.
Why it works locally but hangs in ECS
- Locally, you probably have multiple cores/threads available, so the inner mapped tasks can actually run in parallel while the outer task is blocked.
- In ECS, if your container is limited to 1 vCPU (or your Dask cluster has only 1 worker with 1 thread), the outer task occupies the only worker thread while it waits on .result()
. The inner tasks never get scheduled, so it hangs “forever”.
Recommended patterns to avoid this
1) Keep orchestration in flows (preferred):
- Make check_outer
a flow (a subflow) and do the mapping there. Tasks should generally do work; flows (or subflows) should orchestrate.
- Example:
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
print(f'check_inner({obj})')
return obj[1] == 'b'
@flow(name='check-outer', log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
print(f'check_outer({bucket}, {keys})...')
futures = check_inner.map([(bucket, k) for k in keys])
result = futures.result()
print(f'check_outer({bucket}, {keys}) -> {result}')
return result
@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
keys1 = keys.copy()
keys2 = keys.copy()
# Call subflows; if you need these to run concurrently, call both without blocking,
# then wait on their results:
f1 = check_outer.submit(bucket, keys1)
f2 = check_outer.submit(bucket, keys2)
out = [f1.result(), f2.result()]
print(f'out={out}')
if __name__ == '__main__':
example_flow(['a', 'b', 'c'], 'mb-ngs-portal-data-prod')
2) If you must nest mapping inside a task, ensure there’s spare capacity:
- Configure your Dask cluster to have more than one worker or more than one thread per worker so the inner tasks can run while the outer task is blocked:
from prefect_dask import DaskTaskRunner
task_runner = DaskTaskRunner(
cluster_class="distributed.LocalCluster",
cluster_kwargs={"n_workers": 2, "threads_per_worker": 2},
)
- Also make sure your ECS task has enough CPU/memory to actually create those workers/threads (e.g., >1 vCPU).
3) If you don’t need parallelism for the inner work, keep it simple:
- Inside the outer task, call the function implementation directly to avoid Prefect scheduling at that level:
@task
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
return [check_inner.fn((bucket, k)) for k in keys]
This avoids any chance of deadlock (but loses parallelism for the inner loop).
A couple more notes
- Blocking on .result()
inside a task is the crux of the deadlock in distributed runners (Dask/Ray/etc.). Even using wait()
still blocks that worker.
- If you move orchestration into flows/subflows, the Prefect engine can manage concurrency without holding a worker slot.
- Turn on debug logging in the ECS container to confirm Dask cluster creation and task scheduling: set PREFECT_LOGGING_LEVEL=DEBUG
.
References
- Task runners in Prefect 3.x: Task runners (Prefect 3 docs)
- Prefect blog on mapping concepts: Beyond loops: mapping at scale
- Prefect Dask collection: prefect-dask repository
If you can share:
- Your ECS task CPU/memory settingsMarvin
09/05/2025, 8:01 PMDaskTaskRunner
(are you using a LocalCluster with defaults?)
…I can suggest an exact configuration that will unblock this. My bet is you’re running with effectively a single worker/thread in ECS.Jon Mason
09/05/2025, 8:05 PMMarvin
09/05/2025, 8:05 PMMarvin
09/05/2025, 8:05 PMcheck_outer
task calls check_inner.map(...).result()
while it is itself running on a Dask worker. If all Dask worker slots are occupied by the “outer” mapped tasks, there are no free workers left to execute the “inner” mapped tasks. Since check_outer
is blocked on .result()
, you get a hang with no inner-task logs.
- Locally, you probably have a different Dask configuration (more threads/workers) so there is enough capacity to schedule the “inner” tasks and it completes.
A few ways to fix it
1) Prefer orchestration in flows/subflows (recommended)
- Move the inner mapping out of the task and into a flow (or convert check_outer
into a subflow). This avoids blocking a Dask worker while orchestrating inner tasks.
Example:
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
print(f"check_inner({obj})")
return obj[1] == "b"
@flow
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
print(f"check_outer({bucket}, {keys})...")
result = check_inner.map([(bucket, k) for k in keys]).result()
print(f"check_outer({bucket}, {keys}) -> {result}")
return result
@flow(name="dask-test", task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
keys1 = keys.copy()
keys2 = keys.copy()
out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
print(f"out={out}")
if __name__ == "__main__":
example_flow(["a", "b", "c"], "some-bucket")
2) If you must keep the inner call inside a task, don’t block inside the worker
- Avoid calling .result()
inside the task. Instead, return the futures to the flow and resolve there. This prevents a worker from sitting blocked while waiting for work that also needs a worker.
@task
def check_outer(bucket: str, keys: list[str]):
futures = check_inner.map([(bucket, k) for k in keys]) # no .result()
return futures # return PrefectFutureList
@flow(task_runner=DaskTaskRunner())
def example_flow(keys: list[str], bucket: str):
fut_lists = check_outer.map(unmapped(bucket), [keys, keys])
# Resolve at the flow level
results = [futs.result() for futs in fut_lists]
print(results)
3) Ensure your Dask cluster has spare capacity for nested work
- Give Dask enough workers/threads so inner tasks can run while outer tasks are executing. For a temporary local cluster inside your ECS task:
from prefect_dask import DaskTaskRunner
runner = DaskTaskRunner(
cluster_class="distributed.LocalCluster",
cluster_kwargs={
"n_workers": 2, # >= number of concurrent outer tasks
"threads_per_worker": 2, # allow nested work per worker
"processes": False # threads can help avoid deadlock in nested scenarios
},
)
- Alternatively, connect DaskTaskRunner(address="tcp://...")
to an existing scheduler with sufficient workers.
4) Sanity-check that it’s Dask-related
- Temporarily switch to ConcurrentTaskRunner()
or ThreadPoolTaskRunner()
in ECS. If it runs, the issue is your Dask cluster config rather than Prefect or networking.
Other notes
- There’s no Prefect-imposed timeout here unless you set a task_run_timeout
. “No logs” usually means the task never started or is blocked before any prints (consistent with worker saturation).
- Defining tasks at module top-level as you have is good; that avoids serialization issues.
Docs for reference
- Dask task runner integration: Prefect Dask integrationMarvin
09/05/2025, 8:05 PMDaskTaskRunner
parameters (especially cluster_class/cluster_kwargs
), and how many outer tasks you map at once
I can suggest a concrete cluster config that will avoid deadlocks for your workload.Marvin
09/05/2025, 8:07 PM.result()
while the inner mapped tasks have nowhere to run.
- Subflows do not start new ECS tasks. Calling another @flow
from within your flow runs in the same container/process as the parent. This shifts orchestration out of a task (so you avoid the deadlock), but it does not add infrastructure capacity.
- If you want separate ECS tasks, trigger a deployment instead (e.g., run_deployment(...)
). That creates a new flow run on the target work pool and will launch separate infrastructure for it. The as_subflow=True
flag just links the runs in the UI and waits for it; it still runs on the deployment’s infrastructure.
What I’d do in your setup
- Convert check_outer
to a subflow and call it from the parent flow. This avoids blocking inside a task and should stop the hang even on 0.5 vCPU.
- If you want actual parallel capacity, bump your ECS task CPU to at least 1 vCPU and explicitly size the LocalCluster.
Examples
1) Use subflows (no new ECS task; fixes the deadlock):
from prefect import flow, task
@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
print(f'check_inner({obj})')
return obj[1] == 'b'
@flow(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
futures = check_inner.map([(bucket, k) for k in keys])
return futures.result()
@flow(log_prints=True)
def example_flow(keys: list[str], bucket: str):
f1 = check_outer.submit(bucket, keys)
f2 = check_outer.submit(bucket, keys)
out = [f1.result(), f2.result()]
print(f'out={out}')
2) If you do want more concurrent capacity in the same ECS task:
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(
cluster_class="distributed.LocalCluster",
cluster_kwargs={"n_workers": 2, "threads_per_worker": 1},
), log_prints=True)
def example_flow(...):
...
Note: with 0.5 vCPU, more workers/threads won’t help; consider 1–2 vCPU.
3) If you want each “outer” unit to run on its own ECS task:
- Create a deployment for the “outer” flow and call it with run_deployment(...)
from your parent flow. That will spin up separate infra per call.
Helpful docs
- Task runners: Task runners (Prefect 3)
- Running deployments from flows: Run deployments
If you share your work pool type and how your ECS jobs are launched (ECS work pool vs. a long-running worker), I can suggest concrete settings for CPU/memory and Dask cluster sizing.