Good Morning Everyone, I wanted to run tasks in pa...
# prefect-community
s
Good Morning Everyone, I wanted to run tasks in parallel using local Dask cluster (https://docs.prefect.io/core/idioms/parallel.html) but the tasks are executed sequentially. Why is it so? Here is my code snippet
Copy code
start = Parameter("start")
start.set_downstream(s3_connection, flow=flow)
s3_connection.set_downstream(lin_of_bus, flow=flow)
s3_connection.set_downstream(company,  flow=flow)
....
end_task = final_task()
end_task.set_upstream(lin_of_bus,flow=flow)
end_task.set_upstream(company,  flow=flow)
flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
Below schematic diagram shows the tasks are executed sequentially and not parallel
j
Hi Saranya, there's a few reasons why this might be happening, without knowing more I can't be more helpful. • Does your dask cluster have more than one worker? If there's only one worker/thread to do work, then all tasks will still be sequential. • Do your tasks run relatively quickly? Dask's heuristics might determine that it's more efficient to run all tasks sequentially rather than in parallel. • Do your tasks return large amounts of data? Dask's heuristics here might also determine it's more efficient to run sequentially rather than serialize the data between tasks.
s
@Jim Crist-Harif 1. I dont have a dask cluster. Based on this documentation https://docs.prefect.io/core/idioms/parallel.html By not specifying a scheduler address the Dask Executor will create a local Dask cluster, begin executing tasks on it (please dorrect me if I am wrong.). Where can I find if that created local Dask cluster is of single/multi threaded ?? 2. Each task takes minute or 2 3. Most of the tasks return <10KB of data , except 2 tasks which returns around 1 to 2MB of data
j
Hmmm, that is odd. A temporary dask cluster should use roughly the same number of worker processes as you have cores on your computer, so unless you're on a 1 core machine that shouldn't be the problem. Can you run with
cluster_kwargs={"silence_logs": 10}
passed to
DaskExecutor
and report back with the logs (you may want to glance through them to remove anything you'd consider private).
s
@Jim Crist-Harif It didnt help Jim. It is what i have
Copy code
flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
but still runs sequential Screen Shot 2020-09-01 at 9.07.59 PM
@Jim Crist-Harif Any suggestions?
j
Sorry, that wasn't a fix, that was to get better logs so we can help you debug. Can you report back with the logs from that run?
s
@Jim Crist-Harif Attached logs
j
Hi Saranya, it looks like you're doing a cloud/server flow run rather than running with
flow.run
(you can tell, since the logs have
prefect.CloudFlowRunner
in them). I assume you've registered the flow elsewhere. To configure the executor to use with a cloud/server flow run, you need to specify the executor as part of the environment. Try doing
Copy code
flow.environment.executor = DaskExecutor()
before registering your flow.
s
@Jim Crist-Harif 1. If I have
Copy code
flow.environment.executor = DaskExecutor()
as env , will it make all flow to run parallel? 2. I have this line in my code. Does cloudflowrunner overrides flow.run ? Is there a way to make flow.run take preference?
Copy code
flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
j
flow.run
is for running locally, the logs you've posted are from a cloud run. Generally you have two options when running prefect flows: 1. Write a script that builds a flow, then calls
flow.run(...)
at the end. Run the script as
python yourflow.py
. This will use the configuration in
flow.run
2. Write a script that builds a flow, configures the flow's environment, then calls
flow.register
at the end. Run the script as
python yourflow.py
this will register your flow. You can then schedule cloud/server flow runs via cloud/server. See https://docs.prefect.io/orchestration/tutorial/first.html for more information.
will it make all flow to run parallel?
for this flow, yes. Note that environment configuration only affects cloud/server flow runs, while passing
executor
to
flow.run
only affects
flow.run
execution.
s
@Jim Crist-Harif My entire team run flows in cloud. I am looking for parallel run only for my flow and donot want to affect any other flow. Is there a way to do that ?
j
Configuration set on your flow won't affect other flows. Setting
flow.environment.executor
will only affects that flow.
s
@Jim Crist-Harif I have 2 files. Attached those files 1. app.py (contains the flow information) 2. build.py (registers the flow) I ran 3 times with line flow.environment.executor = DaskExecutor() 1. only in file1 2. only in file2 3. both in file1 and 2. Still runs sequential. Can you please take a look into those 2 files and guide me what am I missing here
j
In your build.py file you redefine the environment on line 42, so setting
flow.environment.executor
is lost. I see you re-add it on line 66, but you do that after the call to
register
, you need to do that before the call to
register
. You can then remove setting the executor in
app.py
.
s
Added as suggested. still sequential 😞
j
With
build=False
you're not rebuilding your flow, so the environment config isn't actually stored in your docker image. You'll want to: • drop
storage.build()
• drop the line setting
build=False
Alternatively, move the line where you're configuring your executor to after line 42 so it happens before you call
storage.build
s
@Jim Crist-Harif I tried both options and attached files for your reference. Am I making any other mistakes?
j
Yeah, a couple places. Try this:
Copy code
from prefect.environments.storage import Docker
from prefect.environments import KubernetesJobEnvironment
import sys
import os
import yaml

from prefect.engine.executors import DaskExecutor


def build_prefect_flow_image():
    """
    Builds the prefect flow by adding the kubernetes job environment that is specified in the values
    folder for the s3 service account helm templates. It then stores the flow in a docker image.
    It then registers the flow with prefect cloud.
    :return:
    """
    IMAGE_NAME = os.environ["IMAGE_NAME"]
    ECR_BASE_URL = os.environ["ECR_BASE_URL"]
    IMAGE_TAG = os.environ["TAG"]
    PREFECT_LABELS = os.getenv("PREFECT_LABELS", "")
    KUBERNETES_FILE = os.path.join("values", os.environ["K8S_FILE_NAME"])

    # Set the path to be the folder structure that holds the "app" and "values" folder
    sys.path.append(os.getcwd())

    # Set the environment variables to what is in the values folder If we don't do this prefect is an unhappy technology
    # I'm not sure this is the best way to do it, so we may want to consider if we should do it a different way
    with open("values/stg.yaml") as file:
        stg = yaml.load(file, Loader=yaml.FullLoader)
        if 'envVars' in stg:
            for var in stg['envVars']:
                os.environ[var] = stg['envVars'][var]

    # Assumes you're running in a directory with an app.py file that exports a flow
    # Import the flow
    from <http://app.app|app.app> import flow

    # We want to set prefect to create a job with a kubernetes environment that we have set up. This means that the
    # prefect job that is usually created will create another job with the given kubernetes file
    # The kubernetes job file is created in the CICD pipeline. It holds the environment variables, service account,
    # docker image, flow, etc
    flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
    flow.environment.labels.update(flow.storage.labels)
    flow.environment.labels.update(PREFECT_LABELS.split(","))

    # Our prefect flow and kubernetes info will be stored in a docker image that we host in AWS
    flow.storage = Docker(
        registry_url=ECR_BASE_URL,
        dockerfile="Dockerfile",
        image_name=IMAGE_NAME,
        image_tag=IMAGE_TAG,
        env_vars=dict(JOB_NAME=os.environ["JOB_NAME"], ENV=os.environ["ENV"]),
    )

    flow.register(project_name="fp-aws")

if __name__ == "__main__":
    build_prefect_flow_image()
s
@Jim Crist-Harif Build failed with below error.
Copy code
File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 35, in build_prefect_flow_image
250    flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
251TypeError: __init__() got an unexpected keyword argument 'executor'
252ERROR: Job failed: command terminated with exit code 1
KubernetesJobEnvironment init has executor argument. Why am I getting that error ?
j
It looks like you might be on an older version of prefect, what version are you using?
Copy code
$ prefect version
s
in local mac its 0.13.1 while in image build its 0.11.2
j
Ah, you'll need to update your images to at least 0.12.1. The latest release is 0.13.5.
s
ok. let me use 0.13.5 for image
@Jim Crist-Harif prefect version 0.13.5 , 0.13.1 is throwing following error during image build. I didnt receive this error in 0.11.2
Copy code
File "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 70, in <module>
    build_prefect_flow_image()
  File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 66, in build_prefect_flow_image
    flow.register(project_name="fp-aws")
@Jim Crist-Harif Any suggestions above error due to version upgrade?
j
Sorry for the delayed reply here Saranya, I'm not sure what your error is. On first pass it looks like cloud is rejecting your flow, but the error message isn't getting returned to the user properly. Would you mind opening an issue (https://github.com/PrefectHQ/prefect/issues) with: • A reproducible example of your problem • The version of prefect that you're using Thanks!