Do mapped tasks only behave in a parallel fashion ...
# prefect-community
j
Do mapped tasks only behave in a parallel fashion on dask? I am running on k8s using job per flow and all my mapped tasks log in order sequentially - also theres a huge gap between two mapped tasks I was under the impression that if you have;
Copy code
p1_res = extract_raw_from_api.map(
        asset_location=asset_locations,
        extract_date=unmapped(extract_date),
        extract_period_days=unmapped(extract_period_days),
    )
    p2_res = process_2.map(p1_res)
    p3_res = process_3.map(p2_res)
Then as soon as task 1 from p1_res is done then the corresponding task should start in process_2? As it stands no process_2 tasks start until many of the extract_raw_from_api tasks are already complete...
j
Hi @Josh Greenhalgh, yes parallel execution in prefect is accomplished by using a dask executor
j
Ok and theres no way to turn the mapped tasks into individual k8s jobs?
j
Not at this time but it is something we have heard requests about and are actively exploring!
j
So on k8s (without dask) theres no parallelism at all? That makes me sad 😞
Thanks @josh
j
Technically if you wanted to you could map over the k8s CreateNamespacedJob task where each task would execute an independent kubernetes job but that may be out of scope for what you want
j
Thats actually my next thought...
It would be really great to have an executor that spat out k8s jobs for every task!
j
Note that you don't need to use a distributed executor (
DaskExecutor
) to get parallelization, but you do need to use something other than the default
LocalExecutor
. If your tasks are all io bound (for example, starting and monitoring external k8s jobs), you might find the
LocalDaskExecutor
sufficient. This uses a local thread pool to run tasks in parallel.
j
Ohhh so that would start a dask cluster within the pod and then use that?
Well not a cluster I suppose but a dask scheduler...
j
Yeah
j
Ok thanks! Will give that a go!
Only change I made was adding the executor;
Copy code
with Flow(name=name, storage=storage, run_config=run_config, schedule=schedule, executor = DaskExecutor()) as flow:
And now my mapped tasks don't even start I get a pickling error about max recursion depth 😞
j
Hmmm, sounds like an issue with your tasks. Can you provide the traceback, or a reproducible example?
(also note that f you swap out
executor=LocalDaskExecutor()
you won't get runtime pickling, since all your tasks run in the same process. This shouldn't be required though, the error you got above isn't expected).
j
Yeah I am having trouble with everything! I think fist thing to solve is my puny nodes and then maybe this will go away...
Thanks @Jim Crist-Harif and all the other really responsive prefect folks!