Good Morning Everyone, I wanted to run tasks in pa...
# prefect-community
Good Morning Everyone, I wanted to run tasks in parallel using local Dask cluster ( but the tasks are executed sequentially. Why is it so? Here is my code snippet
Copy code
start = Parameter("start")
start.set_downstream(s3_connection, flow=flow)
s3_connection.set_downstream(lin_of_bus, flow=flow)
s3_connection.set_downstream(company,  flow=flow)
end_task = final_task()
end_task.set_upstream(company,  flow=flow){"start": "hello"}, executor=DaskExecutor())
Below schematic diagram shows the tasks are executed sequentially and not parallel
Hi Saranya, there's a few reasons why this might be happening, without knowing more I can't be more helpful. • Does your dask cluster have more than one worker? If there's only one worker/thread to do work, then all tasks will still be sequential. • Do your tasks run relatively quickly? Dask's heuristics might determine that it's more efficient to run all tasks sequentially rather than in parallel. • Do your tasks return large amounts of data? Dask's heuristics here might also determine it's more efficient to run sequentially rather than serialize the data between tasks.
@Jim Crist-Harif 1. I dont have a dask cluster. Based on this documentation By not specifying a scheduler address the Dask Executor will create a local Dask cluster, begin executing tasks on it (please dorrect me if I am wrong.). Where can I find if that created local Dask cluster is of single/multi threaded ?? 2. Each task takes minute or 2 3. Most of the tasks return <10KB of data , except 2 tasks which returns around 1 to 2MB of data
Hmmm, that is odd. A temporary dask cluster should use roughly the same number of worker processes as you have cores on your computer, so unless you're on a 1 core machine that shouldn't be the problem. Can you run with
cluster_kwargs={"silence_logs": 10}
passed to
and report back with the logs (you may want to glance through them to remove anything you'd consider private).
@Jim Crist-Harif It didnt help Jim. It is what i have
Copy code{"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
but still runs sequential Screen Shot 2020-09-01 at 9.07.59 PM
@Jim Crist-Harif Any suggestions?
Sorry, that wasn't a fix, that was to get better logs so we can help you debug. Can you report back with the logs from that run?
@Jim Crist-Harif Attached logs
Hi Saranya, it looks like you're doing a cloud/server flow run rather than running with
(you can tell, since the logs have
in them). I assume you've registered the flow elsewhere. To configure the executor to use with a cloud/server flow run, you need to specify the executor as part of the environment. Try doing
Copy code
flow.environment.executor = DaskExecutor()
before registering your flow.
@Jim Crist-Harif 1. If I have
Copy code
flow.environment.executor = DaskExecutor()
as env , will it make all flow to run parallel? 2. I have this line in my code. Does cloudflowrunner overrides ? Is there a way to make take preference?
Copy code{"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
is for running locally, the logs you've posted are from a cloud run. Generally you have two options when running prefect flows: 1. Write a script that builds a flow, then calls
at the end. Run the script as
. This will use the configuration in
2. Write a script that builds a flow, configures the flow's environment, then calls
at the end. Run the script as
this will register your flow. You can then schedule cloud/server flow runs via cloud/server. See for more information.
will it make all flow to run parallel?
for this flow, yes. Note that environment configuration only affects cloud/server flow runs, while passing
only affects
@Jim Crist-Harif My entire team run flows in cloud. I am looking for parallel run only for my flow and donot want to affect any other flow. Is there a way to do that ?
Configuration set on your flow won't affect other flows. Setting
will only affects that flow.
@Jim Crist-Harif I have 2 files. Attached those files 1. (contains the flow information) 2. (registers the flow) I ran 3 times with line flow.environment.executor = DaskExecutor() 1. only in file1 2. only in file2 3. both in file1 and 2. Still runs sequential. Can you please take a look into those 2 files and guide me what am I missing here
In your file you redefine the environment on line 42, so setting
is lost. I see you re-add it on line 66, but you do that after the call to
, you need to do that before the call to
. You can then remove setting the executor in
Added as suggested. still sequential 😞
you're not rebuilding your flow, so the environment config isn't actually stored in your docker image. You'll want to: • drop
• drop the line setting
Alternatively, move the line where you're configuring your executor to after line 42 so it happens before you call
@Jim Crist-Harif I tried both options and attached files for your reference. Am I making any other mistakes?
Yeah, a couple places. Try this:
Copy code
from import Docker
from prefect.environments import KubernetesJobEnvironment
import sys
import os
import yaml

from prefect.engine.executors import DaskExecutor

def build_prefect_flow_image():
    Builds the prefect flow by adding the kubernetes job environment that is specified in the values
    folder for the s3 service account helm templates. It then stores the flow in a docker image.
    It then registers the flow with prefect cloud.
    IMAGE_NAME = os.environ["IMAGE_NAME"]
    ECR_BASE_URL = os.environ["ECR_BASE_URL"]
    IMAGE_TAG = os.environ["TAG"]
    PREFECT_LABELS = os.getenv("PREFECT_LABELS", "")
    KUBERNETES_FILE = os.path.join("values", os.environ["K8S_FILE_NAME"])

    # Set the path to be the folder structure that holds the "app" and "values" folder

    # Set the environment variables to what is in the values folder If we don't do this prefect is an unhappy technology
    # I'm not sure this is the best way to do it, so we may want to consider if we should do it a different way
    with open("values/stg.yaml") as file:
        stg = yaml.load(file, Loader=yaml.FullLoader)
        if 'envVars' in stg:
            for var in stg['envVars']:
                os.environ[var] = stg['envVars'][var]

    # Assumes you're running in a directory with an file that exports a flow
    # Import the flow
    from <|> import flow

    # We want to set prefect to create a job with a kubernetes environment that we have set up. This means that the
    # prefect job that is usually created will create another job with the given kubernetes file
    # The kubernetes job file is created in the CICD pipeline. It holds the environment variables, service account,
    # docker image, flow, etc
    flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())

    # Our prefect flow and kubernetes info will be stored in a docker image that we host in AWS = Docker(
        env_vars=dict(JOB_NAME=os.environ["JOB_NAME"], ENV=os.environ["ENV"]),


if __name__ == "__main__":
@Jim Crist-Harif Build failed with below error.
Copy code
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/", line 35, in build_prefect_flow_image
250    flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
251TypeError: __init__() got an unexpected keyword argument 'executor'
252ERROR: Job failed: command terminated with exit code 1
KubernetesJobEnvironment init has executor argument. Why am I getting that error ?
It looks like you might be on an older version of prefect, what version are you using?
Copy code
$ prefect version
in local mac its 0.13.1 while in image build its 0.11.2
Ah, you'll need to update your images to at least 0.12.1. The latest release is 0.13.5.
ok. let me use 0.13.5 for image
@Jim Crist-Harif prefect version 0.13.5 , 0.13.1 is throwing following error during image build. I didnt receive this error in 0.11.2
Copy code
File "/usr/lib/python3.7/json/", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/", line 70, in <module>
  File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/", line 66, in build_prefect_flow_image
@Jim Crist-Harif Any suggestions above error due to version upgrade?
Sorry for the delayed reply here Saranya, I'm not sure what your error is. On first pass it looks like cloud is rejecting your flow, but the error message isn't getting returned to the user properly. Would you mind opening an issue ( with: • A reproducible example of your problem • The version of prefect that you're using Thanks!