https://prefect.io logo
j

Josh Greenhalgh

01/27/2021, 5:24 PM
Do mapped tasks only behave in a parallel fashion on dask? I am running on k8s using job per flow and all my mapped tasks log in order sequentially - also theres a huge gap between two mapped tasks I was under the impression that if you have;
Copy code
p1_res = extract_raw_from_api.map(
        asset_location=asset_locations,
        extract_date=unmapped(extract_date),
        extract_period_days=unmapped(extract_period_days),
    )
    p2_res = process_2.map(p1_res)
    p3_res = process_3.map(p2_res)
Then as soon as task 1 from p1_res is done then the corresponding task should start in process_2? As it stands no process_2 tasks start until many of the extract_raw_from_api tasks are already complete...
j

josh

01/27/2021, 5:31 PM
Hi @Josh Greenhalgh, yes parallel execution in prefect is accomplished by using a dask executor
j

Josh Greenhalgh

01/27/2021, 5:31 PM
Ok and theres no way to turn the mapped tasks into individual k8s jobs?
j

josh

01/27/2021, 5:32 PM
Not at this time but it is something we have heard requests about and are actively exploring!
j

Josh Greenhalgh

01/27/2021, 5:33 PM
So on k8s (without dask) theres no parallelism at all? That makes me sad 😞
Thanks @josh
j

josh

01/27/2021, 5:33 PM
Technically if you wanted to you could map over the k8s CreateNamespacedJob task where each task would execute an independent kubernetes job but that may be out of scope for what you want
j

Josh Greenhalgh

01/27/2021, 5:34 PM
Thats actually my next thought...
It would be really great to have an executor that spat out k8s jobs for every task!
j

Jim Crist-Harif

01/27/2021, 5:35 PM
Note that you don't need to use a distributed executor (
DaskExecutor
) to get parallelization, but you do need to use something other than the default
LocalExecutor
. If your tasks are all io bound (for example, starting and monitoring external k8s jobs), you might find the
LocalDaskExecutor
sufficient. This uses a local thread pool to run tasks in parallel.
j

Josh Greenhalgh

01/27/2021, 5:36 PM
Ohhh so that would start a dask cluster within the pod and then use that?
Well not a cluster I suppose but a dask scheduler...
j

Jim Crist-Harif

01/27/2021, 5:37 PM
Yeah
j

Josh Greenhalgh

01/27/2021, 5:38 PM
Ok thanks! Will give that a go!
Only change I made was adding the executor;
Copy code
with Flow(name=name, storage=storage, run_config=run_config, schedule=schedule, executor = DaskExecutor()) as flow:
And now my mapped tasks don't even start I get a pickling error about max recursion depth 😞
j

Jim Crist-Harif

01/27/2021, 6:09 PM
Hmmm, sounds like an issue with your tasks. Can you provide the traceback, or a reproducible example?
(also note that f you swap out
executor=LocalDaskExecutor()
you won't get runtime pickling, since all your tasks run in the same process. This shouldn't be required though, the error you got above isn't expected).
j

Josh Greenhalgh

01/27/2021, 6:13 PM
Yeah I am having trouble with everything! I think fist thing to solve is my puny nodes and then maybe this will go away...
Thanks @Jim Crist-Harif and all the other really responsive prefect folks!