<@ULVA73B9P> the docs seem to indicate that I can ...
# ask-marvin
j
@Marvin the docs seem to indicate that I can nest calls to
task.map
- for example,
Copy code
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner

@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'


@task(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f'check_outer({bucket}, {keys})...')
    result = check_inner.map([(bucket, k) for k in keys]).result()
    print(f'check_outer({bucket}, {keys}) -> {result}')
    return result


@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()
    out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
    print(f'out={out}')


if __name__ == '__main__':
    example_flow(['a', 'b', 'c'], 'some-bucket')
However, I've also seen stuff online indicating that you can't have call map from inside a task. Interestingly when I run this flow locally it works great. However, when I run it in an ECS prefect3 deployment the tasks hang forever and produce no logs. I've already done a whole thing to make sure that websockets can talk to my server, so I don't think it's that. Could something be timing out? Or is it true that you can't do nested tasks maps like this.
m
thought for 407.1 seconds
n
can you share where you've seen this?
I've also seen stuff online indicating that you can't have call map from inside a task
you can indeed nest map calls
n
yea that's from 2022
j
Ok so old news.
I bet it's a server thing then.
n
hmm
j
Let me try increasing timeouts - some of the tasks I have can run for hours (they call aws batch jobs that do the heavy lifting).
n
i see you say it hangs, but are there logs before it hangs?
or even which task invocations hang might be helpful to know
j
So if I set debug logging, I get some messages in the task:
Copy code
Created task run 'check_outer-2e7' for task 'check_outer'
Executing task 'check_outer' for task run 'check_outer-2e7'...
And then nothing
n
so its the first time you try to use the task runner, specifically in ECS
j
The
check_outer
task hangs, I don't see any runs for
check_inner
👍 1
It's specifically in ECS, but using a DaskTaskRunner (it doesn't make sense to spin up a new ECS container for most tasks).
This doesn't occur when I run the script on my laptop with PREFECT_API_URL set to point to my prefect server.
m
thought for 396.8 seconds
j
Which makes me think it's a server issue.
m
Short answer: - Yes, Prefect 3 supports calling
task.map()
from inside a task, but it’s easy to deadlock if the task that’s doing the mapping blocks on
.result()
while holding the only available worker thread/CPU. - What you’re seeing in ECS is very likely a resource deadlock: the “outer” task is blocking on
.result()
while the Dask cluster doesn’t have spare capacity to run the “inner” mapped tasks, so nothing advances and you see no logs. Why it works locally but hangs in ECS - Locally, you probably have multiple cores/threads available, so the inner mapped tasks can actually run in parallel while the outer task is blocked. - In ECS, if your container is limited to 1 vCPU (or your Dask cluster has only 1 worker with 1 thread), the outer task occupies the only worker thread while it waits on
.result()
. The inner tasks never get scheduled, so it hangs “forever”. Recommended patterns to avoid this 1) Keep orchestration in flows (preferred): - Make
check_outer
a flow (a subflow) and do the mapping there. Tasks should generally do work; flows (or subflows) should orchestrate. - Example:
Copy code
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner

@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'

@flow(name='check-outer', log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f'check_outer({bucket}, {keys})...')
    futures = check_inner.map([(bucket, k) for k in keys])
    result = futures.result()
    print(f'check_outer({bucket}, {keys}) -> {result}')
    return result

@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()

    # Call subflows; if you need these to run concurrently, call both without blocking,
    # then wait on their results:
    f1 = check_outer.submit(bucket, keys1)
    f2 = check_outer.submit(bucket, keys2)
    out = [f1.result(), f2.result()]
    print(f'out={out}')

if __name__ == '__main__':
    example_flow(['a', 'b', 'c'], 'mb-ngs-portal-data-prod')
2) If you must nest mapping inside a task, ensure there’s spare capacity: - Configure your Dask cluster to have more than one worker or more than one thread per worker so the inner tasks can run while the outer task is blocked:
Copy code
from prefect_dask import DaskTaskRunner

task_runner = DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={"n_workers": 2, "threads_per_worker": 2},
)
- Also make sure your ECS task has enough CPU/memory to actually create those workers/threads (e.g., >1 vCPU). 3) If you don’t need parallelism for the inner work, keep it simple: - Inside the outer task, call the function implementation directly to avoid Prefect scheduling at that level:
Copy code
@task
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    return [check_inner.fn((bucket, k)) for k in keys]
This avoids any chance of deadlock (but loses parallelism for the inner loop). A couple more notes - Blocking on
.result()
inside a task is the crux of the deadlock in distributed runners (Dask/Ray/etc.). Even using
wait()
still blocks that worker. - If you move orchestration into flows/subflows, the Prefect engine can manage concurrency without holding a worker slot. - Turn on debug logging in the ECS container to confirm Dask cluster creation and task scheduling: set
PREFECT_LOGGING_LEVEL=DEBUG
. References - Task runners in Prefect 3.x: Task runners (Prefect 3 docs) - Prefect blog on mapping concepts: Beyond loops: mapping at scale - Prefect Dask collection: prefect-dask repository If you can share: - Your ECS task CPU/memory settings
- How you’ve configured
DaskTaskRunner
(are you using a LocalCluster with defaults?) …I can suggest an exact configuration that will unblock this. My bet is you’re running with effectively a single worker/thread in ECS.
j
@Marvin I am using a local cluster with defaults. The agent is a fargate node with 512 cpu, corresponding to 0.5vcpu. The deadlock theory is definitely possible. If I use subflows, will those start new ECS tasks?
m
thought for 130.0 seconds
Short answer: nested mapping is supported in Prefect 3, but orchestrating new tasks from inside a running task while using Dask can deadlock if your cluster doesn’t have free worker threads to run the “inner” tasks. That’s very likely what you’re seeing on ECS. Why it works locally but hangs on ECS - Your
check_outer
task calls
check_inner.map(...).result()
while it is itself running on a Dask worker. If all Dask worker slots are occupied by the “outer” mapped tasks, there are no free workers left to execute the “inner” mapped tasks. Since
check_outer
is blocked on
.result()
, you get a hang with no inner-task logs. - Locally, you probably have a different Dask configuration (more threads/workers) so there is enough capacity to schedule the “inner” tasks and it completes. A few ways to fix it 1) Prefer orchestration in flows/subflows (recommended) - Move the inner mapping out of the task and into a flow (or convert
check_outer
into a subflow). This avoids blocking a Dask worker while orchestrating inner tasks. Example:
Copy code
from prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner

@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
    print(f"check_inner({obj})")
    return obj[1] == "b"

@flow
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f"check_outer({bucket}, {keys})...")
    result = check_inner.map([(bucket, k) for k in keys]).result()
    print(f"check_outer({bucket}, {keys}) -> {result}")
    return result

@flow(name="dask-test", task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()
    out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
    print(f"out={out}")

if __name__ == "__main__":
    example_flow(["a", "b", "c"], "some-bucket")
2) If you must keep the inner call inside a task, don’t block inside the worker - Avoid calling
.result()
inside the task. Instead, return the futures to the flow and resolve there. This prevents a worker from sitting blocked while waiting for work that also needs a worker.
Copy code
@task
def check_outer(bucket: str, keys: list[str]):
    futures = check_inner.map([(bucket, k) for k in keys])  # no .result()
    return futures  # return PrefectFutureList

@flow(task_runner=DaskTaskRunner())
def example_flow(keys: list[str], bucket: str):
    fut_lists = check_outer.map(unmapped(bucket), [keys, keys])
    # Resolve at the flow level
    results = [futs.result() for futs in fut_lists]
    print(results)
3) Ensure your Dask cluster has spare capacity for nested work - Give Dask enough workers/threads so inner tasks can run while outer tasks are executing. For a temporary local cluster inside your ECS task:
Copy code
from prefect_dask import DaskTaskRunner

runner = DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={
        "n_workers": 2,             # >= number of concurrent outer tasks
        "threads_per_worker": 2,    # allow nested work per worker
        "processes": False          # threads can help avoid deadlock in nested scenarios
    },
)
- Alternatively, connect
DaskTaskRunner(address="tcp://...")
to an existing scheduler with sufficient workers. 4) Sanity-check that it’s Dask-related - Temporarily switch to
ConcurrentTaskRunner()
or
ThreadPoolTaskRunner()
in ECS. If it runs, the issue is your Dask cluster config rather than Prefect or networking. Other notes - There’s no Prefect-imposed timeout here unless you set a
task_run_timeout
. “No logs” usually means the task never started or is blocked before any prints (consistent with worker saturation). - Defining tasks at module top-level as you have is good; that avoids serialization issues. Docs for reference - Dask task runner integration: Prefect Dask integration
- Task runners concept: Task runners in Prefect 3 If you can share: - The ECS task CPU/memory, your
DaskTaskRunner
parameters (especially
cluster_class/cluster_kwargs
), and how many outer tasks you map at once I can suggest a concrete cluster config that will avoid deadlocks for your workload.
Great question — and thanks for the extra details. - With a LocalCluster on Fargate 512 CPU (0.5 vCPU), your container effectively has a single compute slot. That makes the nested-map-inside-a-task pattern prone to deadlock because the outer task blocks on
.result()
while the inner mapped tasks have nowhere to run. - Subflows do not start new ECS tasks. Calling another
@flow
from within your flow runs in the same container/process as the parent. This shifts orchestration out of a task (so you avoid the deadlock), but it does not add infrastructure capacity. - If you want separate ECS tasks, trigger a deployment instead (e.g.,
run_deployment(...)
). That creates a new flow run on the target work pool and will launch separate infrastructure for it. The
as_subflow=True
flag just links the runs in the UI and waits for it; it still runs on the deployment’s infrastructure. What I’d do in your setup - Convert
check_outer
to a subflow and call it from the parent flow. This avoids blocking inside a task and should stop the hang even on 0.5 vCPU. - If you want actual parallel capacity, bump your ECS task CPU to at least 1 vCPU and explicitly size the LocalCluster. Examples 1) Use subflows (no new ECS task; fixes the deadlock):
Copy code
from prefect import flow, task

@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'

@flow(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    futures = check_inner.map([(bucket, k) for k in keys])
    return futures.result()

@flow(log_prints=True)
def example_flow(keys: list[str], bucket: str):
    f1 = check_outer.submit(bucket, keys)
    f2 = check_outer.submit(bucket, keys)
    out = [f1.result(), f2.result()]
    print(f'out={out}')
2) If you do want more concurrent capacity in the same ECS task:
Copy code
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={"n_workers": 2, "threads_per_worker": 1},
), log_prints=True)
def example_flow(...):
    ...
Note: with 0.5 vCPU, more workers/threads won’t help; consider 1–2 vCPU. 3) If you want each “outer” unit to run on its own ECS task: - Create a deployment for the “outer” flow and call it with
run_deployment(...)
from your parent flow. That will spin up separate infra per call. Helpful docs - Task runners: Task runners (Prefect 3) - Running deployments from flows: Run deployments If you share your work pool type and how your ECS jobs are launched (ECS work pool vs. a long-running worker), I can suggest concrete settings for CPU/memory and Dask cluster sizing.