karteekaddanki
07/27/2020, 1:18 PMflow.run()
locally it seems to work. Here is the code snippet I am using for registering my flow. I am using Docker
storage.
if args.base_image:
storage = Docker(
base_image=args.base_image,
registry_url=args.registry_url,
local_image=args.local_image)
for flow in flows:
path = storage.add_flow(flow)
print(f"storing flow {flow.name} at {path} in the image.")
storage = storage.build()
for flow in flows:
flow.storage = storage
flow.environment = LocalEnvironment(
executor=LocalDaskExecutor(),
labels=args.labels)
flow.register(project_name=args.project, build=False)
Am I doing something wrong? I am on the developer version of the cloud backend.
EDIT: To be more precise, when I run flow.run()
I see concurrency on the mapped tasks, but not when using the cloud API (even for a single flow).nicholas
07/27/2020, 3:20 PMRemoteDaskEnvironment
instead of the local environment, like this:
from prefect.environments import RemoteDaskEnvironment
###
###
flow.environment = RemoteDaskEnvironment(address="<tcp://YOUR_CLUSTER_ADDRESS>", labels=args.labels)
karteekaddanki
07/28/2020, 8:51 AM<tcp://127.0.0.1>
in my configuration. Any particular reason why the LocalEnvironment isn't concurrent with LocalDaskExecutor?