Thread
#prefect-community
    s

    Saranya Elumalai

    2 years ago
    Good Morning Everyone, I wanted to run tasks in parallel using local Dask cluster (https://docs.prefect.io/core/idioms/parallel.html) but the tasks are executed sequentially. Why is it so? Here is my code snippet
    start = Parameter("start")
    start.set_downstream(s3_connection, flow=flow)
    s3_connection.set_downstream(lin_of_bus, flow=flow)
    s3_connection.set_downstream(company,  flow=flow)
    ....
    end_task = final_task()
    end_task.set_upstream(lin_of_bus,flow=flow)
    end_task.set_upstream(company,  flow=flow)
    flow.run(parameters={"start": "hello"}, executor=DaskExecutor())
    Below schematic diagram shows the tasks are executed sequentially and not parallel
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi Saranya, there's a few reasons why this might be happening, without knowing more I can't be more helpful. • Does your dask cluster have more than one worker? If there's only one worker/thread to do work, then all tasks will still be sequential. • Do your tasks run relatively quickly? Dask's heuristics might determine that it's more efficient to run all tasks sequentially rather than in parallel. • Do your tasks return large amounts of data? Dask's heuristics here might also determine it's more efficient to run sequentially rather than serialize the data between tasks.
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif 1. I dont have a dask cluster. Based on this documentation https://docs.prefect.io/core/idioms/parallel.html By not specifying a scheduler address the Dask Executor will create a local Dask cluster, begin executing tasks on it (please dorrect me if I am wrong.). Where can I find if that created local Dask cluster is of single/multi threaded ?? 2. Each task takes minute or 2 3. Most of the tasks return <10KB of data , except 2 tasks which returns around 1 to 2MB of data
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hmmm, that is odd. A temporary dask cluster should use roughly the same number of worker processes as you have cores on your computer, so unless you're on a 1 core machine that shouldn't be the problem. Can you run with
    cluster_kwargs={"silence_logs": 10}
    passed to
    DaskExecutor
    and report back with the logs (you may want to glance through them to remove anything you'd consider private).
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif It didnt help Jim. It is what i have
    flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
    but still runs sequential Screen Shot 2020-09-01 at 9.07.59 PM
    @Jim Crist-Harif Any suggestions?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Sorry, that wasn't a fix, that was to get better logs so we can help you debug. Can you report back with the logs from that run?
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif Attached logs
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Hi Saranya, it looks like you're doing a cloud/server flow run rather than running with
    flow.run
    (you can tell, since the logs have
    prefect.CloudFlowRunner
    in them). I assume you've registered the flow elsewhere. To configure the executor to use with a cloud/server flow run, you need to specify the executor as part of the environment. Try doing
    flow.environment.executor = DaskExecutor()
    before registering your flow.
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif 1. If I have
    flow.environment.executor = DaskExecutor()
    as env , will it make all flow to run parallel? 2. I have this line in my code. Does cloudflowrunner overrides flow.run ? Is there a way to make flow.run take preference?
    flow.run(parameters={"start": "hello"}, executor=DaskExecutor(cluster_kwargs={"silence_logs": 10}))
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    flow.run
    is for running locally, the logs you've posted are from a cloud run. Generally you have two options when running prefect flows: 1. Write a script that builds a flow, then calls
    flow.run(...)
    at the end. Run the script as
    python yourflow.py
    . This will use the configuration in
    flow.run
    2. Write a script that builds a flow, configures the flow's environment, then calls
    flow.register
    at the end. Run the script as
    python yourflow.py
    this will register your flow. You can then schedule cloud/server flow runs via cloud/server. See https://docs.prefect.io/orchestration/tutorial/first.html for more information.
    will it make all flow to run parallel?
    for this flow, yes. Note that environment configuration only affects cloud/server flow runs, while passing
    executor
    to
    flow.run
    only affects
    flow.run
    execution.
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif My entire team run flows in cloud. I am looking for parallel run only for my flow and donot want to affect any other flow. Is there a way to do that ?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Configuration set on your flow won't affect other flows. Setting
    flow.environment.executor
    will only affects that flow.
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif I have 2 files. Attached those files1. app.py (contains the flow information) 2. build.py (registers the flow) I ran 3 times with line flow.environment.executor = DaskExecutor() 1. only in file1 2. only in file2 3. both in file1 and 2. Still runs sequential. Can you please take a look into those 2 files and guide me what am I missing here
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    In your build.py file you redefine the environment on line 42, so setting
    flow.environment.executor
    is lost. I see you re-add it on line 66, but you do that after the call to
    register
    , you need to do that before the call to
    register
    . You can then remove setting the executor in
    app.py
    .
    s

    Saranya Elumalai

    2 years ago
    Added as suggested. still sequential 😞
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    With
    build=False
    you're not rebuilding your flow, so the environment config isn't actually stored in your docker image. You'll want to: • drop
    storage.build()
    • drop the line setting
    build=False
    Alternatively, move the line where you're configuring your executor to after line 42 so it happens before you call
    storage.build
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif I tried both options and attached files for your reference. Am I making any other mistakes?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Yeah, a couple places. Try this:
    from prefect.environments.storage import Docker
    from prefect.environments import KubernetesJobEnvironment
    import sys
    import os
    import yaml
    
    from prefect.engine.executors import DaskExecutor
    
    
    def build_prefect_flow_image():
        """
        Builds the prefect flow by adding the kubernetes job environment that is specified in the values
        folder for the s3 service account helm templates. It then stores the flow in a docker image.
        It then registers the flow with prefect cloud.
        :return:
        """
        IMAGE_NAME = os.environ["IMAGE_NAME"]
        ECR_BASE_URL = os.environ["ECR_BASE_URL"]
        IMAGE_TAG = os.environ["TAG"]
        PREFECT_LABELS = os.getenv("PREFECT_LABELS", "")
        KUBERNETES_FILE = os.path.join("values", os.environ["K8S_FILE_NAME"])
    
        # Set the path to be the folder structure that holds the "app" and "values" folder
        sys.path.append(os.getcwd())
    
        # Set the environment variables to what is in the values folder If we don't do this prefect is an unhappy technology
        # I'm not sure this is the best way to do it, so we may want to consider if we should do it a different way
        with open("values/stg.yaml") as file:
            stg = yaml.load(file, Loader=yaml.FullLoader)
            if 'envVars' in stg:
                for var in stg['envVars']:
                    os.environ[var] = stg['envVars'][var]
    
        # Assumes you're running in a directory with an app.py file that exports a flow
        # Import the flow
        from <http://app.app|app.app> import flow
    
        # We want to set prefect to create a job with a kubernetes environment that we have set up. This means that the
        # prefect job that is usually created will create another job with the given kubernetes file
        # The kubernetes job file is created in the CICD pipeline. It holds the environment variables, service account,
        # docker image, flow, etc
        flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
        flow.environment.labels.update(flow.storage.labels)
        flow.environment.labels.update(PREFECT_LABELS.split(","))
    
        # Our prefect flow and kubernetes info will be stored in a docker image that we host in AWS
        flow.storage = Docker(
            registry_url=ECR_BASE_URL,
            dockerfile="Dockerfile",
            image_name=IMAGE_NAME,
            image_tag=IMAGE_TAG,
            env_vars=dict(JOB_NAME=os.environ["JOB_NAME"], ENV=os.environ["ENV"]),
        )
    
        flow.register(project_name="fp-aws")
    
    if __name__ == "__main__":
        build_prefect_flow_image()
    s

    Saranya Elumalai

    2 years ago
    @Jim Crist-Harif Build failed with below error.
    File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 35, in build_prefect_flow_image
    250    flow.environment = KubernetesJobEnvironment(job_spec_file=KUBERNETES_FILE, executor=DaskExecutor())
    251TypeError: __init__() got an unexpected keyword argument 'executor'
    252ERROR: Job failed: command terminated with exit code 1
    KubernetesJobEnvironment init has executor argument. Why am I getting that error ?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    It looks like you might be on an older version of prefect, what version are you using?
    $ prefect version
    s

    Saranya Elumalai

    2 years ago
    in local mac its 0.13.1 while in image build its 0.11.2
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Ah, you'll need to update your images to at least 0.12.1. The latest release is 0.13.5.
    s

    Saranya Elumalai

    2 years ago
    ok. let me use 0.13.5 for image
    @Jim Crist-Harif prefect version 0.13.5 , 0.13.1 is throwing following error during image build. I didnt receive this error in 0.11.2
    File "/usr/lib/python3.7/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
      File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 70, in <module>
        build_prefect_flow_image()
      File "/builds/heb-engineering/teams/data-solutions-engineering/foundational-projects-team/heb-dsol-foundational-projects/fp-etl-aws/scripts/prefect/build.py", line 66, in build_prefect_flow_image
        flow.register(project_name="fp-aws")
    @Jim Crist-Harif Any suggestions above error due to version upgrade?
    Jim Crist-Harif

    Jim Crist-Harif

    2 years ago
    Sorry for the delayed reply here Saranya, I'm not sure what your error is. On first pass it looks like cloud is rejecting your flow, but the error message isn't getting returned to the user properly. Would you mind opening an issue (https://github.com/PrefectHQ/prefect/issues) with: • A reproducible example of your problem • The version of prefect that you're using Thanks!