How do I get a mapped task to execute in parallel ...
# ask-community
s
How do I get a mapped task to execute in parallel when orchestrated in Prefect Cloud? I can use the
LocalDaskExecutor
locally via
flow.run(executor=LocalDaskExecutor())
, to get it to spawn multiple threads to execute in parallel, but when I register the same flow to Cloud, it seems to execute the mapped tasks in sequence.
z
Hi Scott, this sounds like it could be an executor issue. Do you mind checking what executor you have on the remote environment for your registered flow?
s
It's set to
KubernetesJobEnvironment
with a
job_spec_file
. Is there a way to specify the executor within the K8 Pod it creates? I don't think it needs to spawn a full Dask cluster .. the Flow in question needs to ping an off-network API for a lot of observations, which one Pod should be able to handle.
j
You should be able to specify your executor and the address on the YAML env vars with something like:
Copy code
- name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
  value: "prefect.engine.executors.DaskExecutor"
- name: PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS
  value: "<tcp://your-scheduler:8786>"
👍 1
s
I used the LocalDaskExecutor in the yaml and it's churning through them nicely now!
Copy code
- name: PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS
              value: "prefect.engine.executors.LocalDaskExecutor"
Thanks for a heads-up on the environment variable configuration. Are there other environment variables I should be aware of (are they all documented somewhere)?
j
Anything in Prefect’s config.toml can be set via env var 🙂 https://github.com/PrefectHQ/prefect/blob/master/src/prefect/config.toml
s
what's the order of preference? If I have an envvar set, and something in config.toml .. which value wins?
j
The env var
The preference should be: env var > custom config > default config With the left most taking precedence
👍 1
s
actually I spoke too soon, my default
scheduler
for
LocalDaskExecutor
is "synchronous", and I don't see a way to specify the
scheduler
parameter for the executor via config.toml (I want to set it to
threads
). Guess I'll play around with
DaskExecutor