Jon Mason
09/05/2025, 7:54 PMtask.mapfrom prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'
@task(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f'check_outer({bucket}, {keys})...')
    result = check_inner.map([(bucket, k) for k in keys]).result()
    print(f'check_outer({bucket}, {keys}) -> {result}')
    return result
@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()
    out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
    print(f'out={out}')
if __name__ == '__main__':
    example_flow(['a', 'b', 'c'], 'some-bucket')Marvin
09/05/2025, 7:54 PMNate
09/05/2025, 7:54 PMI've also seen stuff online indicating that you can't have call map from inside a taskyou can indeed nest map calls
Nate
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:55 PMNate
09/05/2025, 7:55 PMJon Mason
09/05/2025, 7:56 PMNate
09/05/2025, 7:56 PMNate
09/05/2025, 7:57 PMJon Mason
09/05/2025, 7:57 PMCreated task run 'check_outer-2e7' for task 'check_outer'
Executing task 'check_outer' for task run 'check_outer-2e7'...Jon Mason
09/05/2025, 7:57 PMNate
09/05/2025, 7:57 PMJon Mason
09/05/2025, 7:57 PMcheck_outercheck_innerJon Mason
09/05/2025, 7:58 PMJon Mason
09/05/2025, 7:58 PMMarvin
09/05/2025, 7:59 PMJon Mason
09/05/2025, 7:59 PMMarvin
09/05/2025, 8:01 PMtask.map().result().result().result()check_outerfrom prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'
@flow(name='check-outer', log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f'check_outer({bucket}, {keys})...')
    futures = check_inner.map([(bucket, k) for k in keys])
    result = futures.result()
    print(f'check_outer({bucket}, {keys}) -> {result}')
    return result
@flow(name='dask-test', task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()
    # Call subflows; if you need these to run concurrently, call both without blocking,
    # then wait on their results:
    f1 = check_outer.submit(bucket, keys1)
    f2 = check_outer.submit(bucket, keys2)
    out = [f1.result(), f2.result()]
    print(f'out={out}')
if __name__ == '__main__':
    example_flow(['a', 'b', 'c'], 'mb-ngs-portal-data-prod')from prefect_dask import DaskTaskRunner
task_runner = DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={"n_workers": 2, "threads_per_worker": 2},
)@task
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    return [check_inner.fn((bucket, k)) for k in keys].result()wait()PREFECT_LOGGING_LEVEL=DEBUGMarvin
09/05/2025, 8:01 PMDaskTaskRunnerJon Mason
09/05/2025, 8:05 PMMarvin
09/05/2025, 8:05 PMMarvin
09/05/2025, 8:05 PMcheck_outercheck_inner.map(...).result()check_outer.result()check_outerfrom prefect import flow, task, unmapped
from prefect_dask import DaskTaskRunner
@task(log_prints=True)
def check_inner(obj: tuple[str, str]) -> bool:
    print(f"check_inner({obj})")
    return obj[1] == "b"
@flow
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    print(f"check_outer({bucket}, {keys})...")
    result = check_inner.map([(bucket, k) for k in keys]).result()
    print(f"check_outer({bucket}, {keys}) -> {result}")
    return result
@flow(name="dask-test", task_runner=DaskTaskRunner(), log_prints=True)
def example_flow(keys: list[str], bucket: str):
    keys1 = keys.copy()
    keys2 = keys.copy()
    out = check_outer.map(unmapped(bucket), [keys1, keys2]).result()
    print(f"out={out}")
if __name__ == "__main__":
    example_flow(["a", "b", "c"], "some-bucket").result()@task
def check_outer(bucket: str, keys: list[str]):
    futures = check_inner.map([(bucket, k) for k in keys])  # no .result()
    return futures  # return PrefectFutureList
@flow(task_runner=DaskTaskRunner())
def example_flow(keys: list[str], bucket: str):
    fut_lists = check_outer.map(unmapped(bucket), [keys, keys])
    # Resolve at the flow level
    results = [futs.result() for futs in fut_lists]
    print(results)from prefect_dask import DaskTaskRunner
runner = DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={
        "n_workers": 2,             # >= number of concurrent outer tasks
        "threads_per_worker": 2,    # allow nested work per worker
        "processes": False          # threads can help avoid deadlock in nested scenarios
    },
)DaskTaskRunner(address="tcp://...")ConcurrentTaskRunner()ThreadPoolTaskRunner()task_run_timeoutMarvin
09/05/2025, 8:05 PMDaskTaskRunnercluster_class/cluster_kwargsMarvin
09/05/2025, 8:07 PM.result()@flowrun_deployment(...)as_subflow=Truecheck_outerfrom prefect import flow, task
@task(log_prints=True)
def check_inner(obj: tuple[str,str]) -> bool:
    print(f'check_inner({obj})')
    return obj[1] == 'b'
@flow(log_prints=True)
def check_outer(bucket: str, keys: list[str]) -> list[bool]:
    futures = check_inner.map([(bucket, k) for k in keys])
    return futures.result()
@flow(log_prints=True)
def example_flow(keys: list[str], bucket: str):
    f1 = check_outer.submit(bucket, keys)
    f2 = check_outer.submit(bucket, keys)
    out = [f1.result(), f2.result()]
    print(f'out={out}')from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(
    cluster_class="distributed.LocalCluster",
    cluster_kwargs={"n_workers": 2, "threads_per_worker": 1},
), log_prints=True)
def example_flow(...):
    ...run_deployment(...)