so I can’t use map() with a multiprocessing schedu...
# prefect-community
t
so I can’t use map() with a multiprocessing scheduler?
z
Hi @Thomas Wiecki, would you mind clarifying what you'd like to do? I want to make sure I'm giving you the proper advice. 🙂
t
Hi @Zachary Hughes! thanks — I want to run multiple pymc3-models in parallel with different parameter sets
z
That's super useful-- thank you! If you want to map in parallel, I think Dask might be the right call. I've linked a recipe for a Dask cluster-- by spinning up multiple workers, you should be able to achieve the parallelization you're looking for. Does this sound like it might do the trick? https://docs.prefect.io/orchestration/recipes/k8s_dask.html#dask-cluster-on-kubernetes
t
so using kubernetes would work? is there an intuitive reason why map doesn’t work with DaskExecutor and multiprocessing?
z
Let me ask the team and I'll get back to you ASAP!
t
appreciate it
z
CCing @Jim Crist-Harif, our resident Dask expert!
But to answer your first question, I'd expect this Kubernetes recipe to work for parallelization, but I wouldn't anticipate you needing to use Kubernetes to achieve parallelization. Dask alone should do the trick.
t
so this is quite odd
and I wonder if that’s true for using Kubernetes too
j
This has to do with how
.map
is implemented currently. The current implementation relies on tasks being able to submit additional tasks. The mapping execution mechanism is currently being reworked to support DFE (https://github.com/PrefectHQ/prefect/issues/2041), and one thing that will fall-out of this is that tasks will no longer submit additional tasks. The reason this fails for multiprocessing, is that dask's local scheduler (which can either use threads or processes) expects to get all work submitted up-front, then compute the graph in one pass. We recommend using the
distributed
scheduler (which works fine locally), which natively supports tasks submitting tasks. This will be less necessary after the mapping refactor, but the distributed scheduler is generally nicer and more performant than the multiprocessing scheduler (as a Dask dev, we generally recommend users avoid the multiprocessing scheduler, using either threads or distributed instead).
So in short: • Currently things won't work with the multiprocessing scheduler due to how prefect is written • In the future this will be changed due to the mapping refactor • Generally though we recommend users of dask use either threads or the distributed scheduler (which can be deployed locally just fine). • The distributed scheduler can be run locally, or on many clustering backends. The deployment mechanism (e.g. kubernetes) doesn't affect the execution, these are separate components.
Does that answer your question @Thomas Wiecki?
t
@Jim Crist-Harif so the distributed scheduler does work with map?
j
Yes
t
thanks @Jim Crist-Harif!
do you have a link so that I know I’m doing the right thing?
j
Hmmm, it looks like we don't have a nice example that doesn't assume a remote cluster (i.e. the cluster is created as a separate step, which I'd argue is an antipattern for most users).
Copy code
from prefect.engine.executors import DaskExecutor


with Flow(...) as flow:
    # create your flow

executor = DaskExecutor(n_workers=4, threads_per_worker=1)
flow.run(executor=executor)
The above example creates a local dask cluster with 4 worker processes, each with 1 thread, and runs the flow on that cluster. The cluster lifetime is contained in the
flow.run
call, and will be cleaned up afterwards.
t
but it’s not using map
and as per my new understanding that wouldn’t work because it’s still using DaskExecutor?
j
The map code should be the same regardless of executor (execept that map doesn't work with
processes
)
t
Copy code
# Map over two coins
with Flow('CoinFlow') as flow:
    flips = flip_coin.map([.5, .25])
    infer_p.map(flips)
    flow.run(executor=DaskExecutor(n_workers=4, threads_per_worker=1))
so that should work?
I’m quite confused
j
Stepping back a bit: Dask has two types of schedulers: • Local schedulers (using either threads or processes). • The distributed scheduler (which uses a mix of threads and processes). Prefect can use any of these options. The
DaskExecutor
uses a distributed scheduler internally, the
LocalDaskExecutor
uses one of the local schedulers internally.
.map
doesn't work with the
LocalDaskExecutor
when using processes, but does with threads.
t
ohh ok
thanks
j
Did that help clarify? Sorry this is a bit confusing, there's a lot of concepts here.
t
however, I do get an error when using DaskExecutor in parallel
which is the same error I get when running multithreaded
j
Hmmm, can you post an example?
t
requires pymc3
j
Copy code
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor


@task
def flip_coin(x):
    return x


@task
def infer_p(x):
    return x + 1


with Flow('CoinFlow') as flow:
    flips = flip_coin.map([.5, .25])
    infer_p.map(flips)

flow.run(executor=DaskExecutor(n_workers=4, threads_per_worker=1))
t
right that will definitely work
j
Using your example from above, the above works for me.
t
but it seems like as soon as I use theano with DaskExecutor (as the NB does) it breaks down
it could be unrelated to threads, but the error is the same as when I run with the local executor and threads
and I can’t imagine where else it’s coming from
j
I think this has nothing to do with prefect itself, and has something to do with theano not playing nice with dask distributed.
I'm not familiar with theano enough to know here. It looks like theano is trying to use multiprocessing itself as well?
A rendered ipynb of Thomas's example, for anyone interested: https://gist.github.com/jcrist/8d1dd1ef93fcd2baab871efff615d028
Note that this is a different error than the one indicating
flow.map
doesn't work with
processes
, this is during execution and is an error raised by theano.
t
yeah, I think that’s likely
j
t
I think I figured it out
if I set test_values to ignore it works
j
Oh, good. Glad you got things working.
t
thanks for your help, prefect is awesome!
🎉 1