karteekaddanki
07/28/2020, 7:20 PMLocalEnvironment(executor=DaskExecutor())
. Is this a limitation of the dev version of the cloud API? When using LocalDaskExecutor
and running the flow with flow.run()
results in concurrent execution of the tasks. I am using Docker storage for the flows. How can I debug what's going on in this case?Jim Crist-Harif
07/28/2020, 7:22 PMDaskExecutor
on creation?karteekaddanki
07/28/2020, 7:25 PM0.12.3
. No I'm not passing any parameters to DaskExecutor
on startup. I am just setting, flow.environment = LocalEnvironment(executor=DaskExecutor())
and registering the flow. I've tried LocalDaskExecutor
originally without any success. I do have tags on the mapped tasks, not sure if that is an issue with concurrency in the dev version of the cloud. My tasks also read from a shared global state but that shouldn't matter as they trigger a subprocess
running a C++ application.Jim Crist-Harif
07/28/2020, 7:31 PMWhen usingand running the flow withLocalDaskExecutor
results in concurrent execution of the tasks.flow.run()
I've triedDoes theoriginally without any success.LocalDaskExecutor
LocalDaskExecutor
work successfully for you or not? In what ways does it not work?karteekaddanki
07/28/2020, 7:37 PM@task(tags=["cpp"])
. I have ~20 cores. When I run the flow locally (without the cloud API) using flow.run()
and LocalDaskExecutor
there were no issues. However, when I try to trigger the same flow from the cloud API, my tasks run sequentially using both LocalDaskExecutor
and DaskExecutor()
.Jim Crist-Harif
07/28/2020, 7:38 PMkarteekaddanki
07/28/2020, 7:43 PMflow.run()
works with LocalDaskExecutor
although I get the leaked semaphore warning. Let me update to 0.12.5. It's always a good idea.Jim Crist-Harif
07/28/2020, 8:01 PMfrom prefect import Flow, task
from prefect.engine.executors import DaskExecutor
import time
@task
def slow_inc(x):
time.sleep(2)
return x + 1
with Flow("test") as flow:
res = slow_inc.map(range(20))
flow.environment.executor = DaskExecutor()
flow.register("my-project")
karteekaddanki
07/28/2020, 8:03 PMEnvironmentType LocalEnvironment
this is what I see in the cloud UI. Any chance I might be messing up the executor? Cause I am building the flow before I set the executor.Jim Crist-Harif
07/28/2020, 8:04 PMflow.register
should be properly persisted. You can also check the logs, their should be some executor specific logs for a flow run.karteekaddanki
07/28/2020, 8:39 PMimport argparse
import os
import importlib
import prefect
from prefect.environments.storage import Docker
from prefect.environments import LocalEnvironment
from prefect.engine.executors import DaskExecutor, LocalDaskExecutor
# registers a flow given the name of the module
# sets the storage to docker file base-image is given
if __name__ == "__main__":
# setup args
parser = argparse.ArgumentParser(
description="Runs a flow on trading dates on the exchange.")
parser.add_argument(
"--flow", type=str, required=True,
help="Module path which contains the flow.")
parser.add_argument("--project", type=str, required=True,
help="Project name in which to register the flow.")
parser.add_argument("--base-image", type=str,
help="Name of the base image which is associated with the storage.")
parser.add_argument("--local-image", action='store_true',
help="Option if the docker image is local")
parser.add_argument("--env", type=str, nargs='*',
help="Environment variables to pass to the docker storage.")
parser.add_argument("--labels", nargs="*",
help="Labels to pass to the environment")
parser.add_argument("--registry-url", type=str,
help="A URL to the registry for uploading the flow.")
args = parser.parse_args()
envs = {}
if args.env:
for e in args.env:
sv = e.split('=')
assert len(sv) == 2, "environment variable should be of the form k=v"
envs[sv[0]] = sv[1]
# export the variables to environ so that import works.
for k in envs:
os.environ[k] = envs[k]
flows = importlib.import_module(args.flow).FLOWS
if args.base_image:
storage = Docker(
env_vars=envs,
base_image=args.base_image,
registry_url=args.registry_url,
local_image=args.local_image)
for flow in flows:
path = storage.add_flow(flow)
print(f"storing flow {flow.name} at {path} in the image.")
storage = storage.build()
for flow in flows:
flow.storage = storage
flow.environment.executor = DaskExecutor()
flow.register(project_name=args.project, build=False)
Jim Crist-Harif
07/28/2020, 8:45 PMflow.register()
.
Either way, it looks like the issue is likely that you're setting the DaskExecutor()
as the environment exercutor after the storage is built, which would mean that it won't be persisted in the flow.
Try:
if args.base_image:
storage = Docker(
env_vars=envs,
base_image=args.base_image,
registry_url=args.registry_url,
local_image=args.local_image)
for flow in flows:
flow.storage = storage
flow.environment.executor = DaskExecutor()
print(f"storing flow {flow.name} at {path} in the image.")
storage.add_flow(flow)
storage.build()
for flow in flows:
flow.register(project_name=args.project, build=False)
karteekaddanki
07/28/2020, 8:54 PMRiley Hun
08/13/2020, 10:20 PMkarteekaddanki
08/20/2020, 8:39 AM