Hi guys, I've posted a version of this question ye...
# prefect-community
k
Hi guys, I've posted a version of this question yesterday. My mapped task when run through the cloud API is not running concurrently when using
LocalEnvironment(executor=DaskExecutor())
. Is this a limitation of the dev version of the cloud API? When using
LocalDaskExecutor
and running the flow with
flow.run()
results in concurrent execution of the tasks. I am using Docker storage for the flows. How can I debug what's going on in this case?
j
Hi @karteekaddanki, what version of prefect are you running? Do you pass any parameters to
DaskExecutor
on creation?
k
@Jim Crist-Harif I'm running
0.12.3
. No I'm not passing any parameters to
DaskExecutor
on startup. I am just setting,
flow.environment = LocalEnvironment(executor=DaskExecutor())
and registering the flow. I've tried
LocalDaskExecutor
originally without any success. I do have tags on the mapped tasks, not sure if that is an issue with concurrency in the dev version of the cloud. My tasks also read from a shared global state but that shouldn't matter as they trigger a
subprocess
running a C++ application.
j
Are your tags resource tags (tags that indicate a dask resource requirement)?
By default dask will start as many workers as there are cores (approximately, it's not exactly a 1-to-1 ratio, there's a heuristic). Do you have more than one core on the executing machine?
When using
LocalDaskExecutor
and running the flow with
flow.run()
results in concurrent execution of the tasks.
I've tried
LocalDaskExecutor
originally without any success.
Does the
LocalDaskExecutor
work successfully for you or not? In what ways does it not work?
k
@Jim Crist-Harif My tags are just stand-ins for future task concurrency limits and look like this
@task(tags=["cpp"])
. I have ~20 cores. When I run the flow locally (without the cloud API) using
flow.run()
and
LocalDaskExecutor
there were no issues. However, when I try to trigger the same flow from the cloud API, my tasks run sequentially using both
LocalDaskExecutor
and
DaskExecutor()
.
j
Ah, that's helpful. How large are the results being returned from these tasks? How large is the data being passed into the tasks?
I have a hunch that https://github.com/PrefectHQ/prefect/pull/2984 will have resolved your issue. If possible, can you try upgrading to 0.12.5 and see if that fixes things?
That may look unrelated, but due to the way prefect was holding onto results, it could result in large amounts of data transfer between tasks, which would mess with dask's scheduling heuristics.
k
@Jim Crist-Harif The results aren't too big tbh, the data that is being passed to these tasks are filepaths along with some configuration (a dict with a few keys). It's also interesting why my
flow.run()
works with
LocalDaskExecutor
although I get the leaked semaphore warning. Let me update to 0.12.5. It's always a good idea.
@Jim Crist-Harif Update to 0.12.5 didn't really help. Do I need to run multiple agents to ensure parallelism? I just have one docker agent running.
j
No, the parallelism should happen at the executor level.
Can you try with a simple flow and see if you get parallelism in that case? Something like:
Copy code
from prefect import Flow, task
from prefect.engine.executors import DaskExecutor
import time

@task
def slow_inc(x):
    time.sleep(2)
    return x + 1

with Flow("test") as flow:
    res = slow_inc.map(range(20))
flow.environment.executor = DaskExecutor()
flow.register("my-project")
k
Copy code
EnvironmentType LocalEnvironment
this is what I see in the cloud UI. Any chance I might be messing up the executor? Cause I am building the flow before I set the executor.
j
Executor information isn't serialized to the UI. Anything you configure on the flow before calling
flow.register
should be properly persisted. You can also check the logs, their should be some executor specific logs for a flow run.
In my tests I get proper parallelism when mapping on cloud/server using a dask executor, so things might be your setup/your tasks specific. If a simple flow works successfully for you, we might need your help to get a reproducible example so we can debug further.
k
@Jim Crist-Harif The simple example worked. I've tried to simplify my flow registration and my code also works. However, I'm trying to register my flows using our deployment tool and I might've complicated my registration script a bit much. This is the script with which the concurrency doesn't seem to be working. Do you see anything wrong with it? When I import the script directly and register the flow, it works. Thanks again for your help.
Copy code
import argparse
import os
import importlib

import prefect
from prefect.environments.storage import Docker
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor, LocalDaskExecutor

# registers a flow given the name of the module
# sets the storage to docker file base-image is given
if __name__ == "__main__":
    # setup args
    parser = argparse.ArgumentParser(
        description="Runs a flow on trading dates on the exchange.")
    parser.add_argument(
        "--flow", type=str, required=True,
        help="Module path which contains the flow.")
    parser.add_argument("--project", type=str, required=True,
                        help="Project name in which to register the flow.")
    parser.add_argument("--base-image", type=str,
                        help="Name of the base image which is associated with the storage.")
    parser.add_argument("--local-image", action='store_true',
                        help="Option if the docker image is local")
    parser.add_argument("--env", type=str, nargs='*',
                        help="Environment variables to pass to the docker storage.")
    parser.add_argument("--labels", nargs="*",
                        help="Labels to pass to the environment")
    parser.add_argument("--registry-url", type=str,
                        help="A URL to the registry for uploading the flow.")

    args = parser.parse_args()
    envs = {}
    if args.env:
        for e in args.env:
            sv = e.split('=')
            assert len(sv) == 2, "environment variable should be of the form k=v"
            envs[sv[0]] = sv[1]

    # export the variables to environ so that import works.
    for k in envs:
        os.environ[k] = envs[k]

    flows = importlib.import_module(args.flow).FLOWS

    if args.base_image:
        storage = Docker(
                env_vars=envs,
                base_image=args.base_image,
                registry_url=args.registry_url,
                local_image=args.local_image)
        for flow in flows:
            path = storage.add_flow(flow)
            print(f"storing flow {flow.name} at {path} in the image.")
        storage = storage.build()
        for flow in flows:
            flow.storage = storage
            flow.environment.executor = DaskExecutor()
            flow.register(project_name=args.project, build=False)
j
Hmmm, we generally don't advise doing the storage building manually, usually that would all be done for you in the call to
flow.register()
. Either way, it looks like the issue is likely that you're setting the
DaskExecutor()
as the environment exercutor after the storage is built, which would mean that it won't be persisted in the flow. Try:
Copy code
if args.base_image:
        storage = Docker(
                env_vars=envs,
                base_image=args.base_image,
                registry_url=args.registry_url,
                local_image=args.local_image)
        for flow in flows:
            flow.storage = storage
            flow.environment.executor = DaskExecutor()
            print(f"storing flow {flow.name} at {path} in the image.")
            storage.add_flow(flow)
        storage.build()
        for flow in flows:
            flow.register(project_name=args.project, build=False)
💯 1
k
It works. I was going by this https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html as I am trying to create a generic framework where CI/CD can also be done on our flows without exploding the number of containers. But the mistake is obvious now. Thanks for your patience once again.
r
Hello folks, Would you be so kind as to explain the part about using importlib to extract the flows from other files. I'm trying to do the same thing but it's been hard to recreate without knowing what your folder tree looks like. Thanks in advance!
k
@Riley Hun Sorry was away for a while. My flows reside in a python module (so that we can version and manage relevant python deps in a way that is consistent within how we do things at my firm). each flow is within a module and is imported from importlib before registering. The associated environment is built and the flow is pushed to a docker registry.