For some reason my tasks are not running in parall...
# prefect-community
a
For some reason my tasks are not running in parallel. Logs saying
Using executor type DaskExecutor
(i use local one).
Copy code
import time

from prefect import task, Flow
from prefect.engine.executors import DaskExecutor
from prefect.environments import LocalEnvironment


@task
def wait_for(seconds):
    time.sleep(seconds)


with Flow('_Concurrency_') as flow:
    head = wait_for(5)
    leaf1 = wait_for(30)
    leaf2 = wait_for(15)
    tail = wait_for(3)

    head.set_dependencies(downstream_tasks=[leaf1, leaf2])
    leaf1.set_downstream(tail)
    leaf2.set_downstream(tail)

flow.environment = LocalEnvironment(executor=DaskExecutor())
flow.executor = DaskExecutor()
I expect leaf1 and leaf2 to run in parallel but they are not. In gantt chart i see they running sequentially. If i run them locally, they run in parallel. BTW whats the difference between environment executor and flow executor? Flows are run by docker agent.
n
Hi @Alexander - try using a LocalDaskExecutor instead, like this:
Copy code
from prefect.engine.executors import LocalDaskExecutor

###

flow.environment = LocalEnvironment(executor=LocalDaskExecutor(scheduler="threads", num_workers=6))
To your question: the difference is that the flow executor is used when running flows locally (
flow.run
) and the environment executor is used when running them with an environment.
🎉 1
a
Oh my god i got it, OK. So the culprit here is num_workers. I was digging prefect source code here and there to find the reason why it was not running in parallel and the reason was simple -
num_workers = dask.config.get("num_workers", None) or CPU_COUNT
🤦‍♂️
🚀 2
a
@nicholas is there any docs explaining this little, but substantial, difference?
n
Hi @ale - if you're meaning the difference between executors, I'm not sure there are docs about that specifically, more an implicit difference since environments aren't configured for flows running locally. If you're looking for the difference between
LocalDaskExecutor
and
DaskExecutor
, you can read about that here, with the primary difference between the two being the scheduler; the
DaskExecutor
uses the Dask distributed scheduler while the
LocalDaskExecutor
can use any scheduler but doesn't leverage all the distributed features of Dask.