Brad I
09/03/2021, 5:41 AMDaskExecutor
not getting the env vars set in the run config, this has caused other issues in the way we have been deploying flows to different environments (test, dev, prod, etc). Previously, using the LocalDaskExecutor
, we could set the executor on the flow itself and it was generic enough to work across all of our environments. Now we also need to change the dask config depending on our target environment, but it seems like setting the executor outside of the with Flow()
definition just gets ignored. I believe this has something to do with the storage
and run_config
being serialized with the flow but the executor
is not right now. An example flow and registration script is attached inside this thread. Does anyone know of a work around for this issue or have a better way to do this?Brad I
09/03/2021, 5:41 AMimport time
from datetime import datetime
from slugify import slugify
import prefect
from prefect import Parameter, Flow, task, unmapped
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Local
from dask_kubernetes import KubeCluster, make_pod_spec
@task
def pre_process_batch(run_batch):
logger = prefect.context.get("logger")
total = len(run_batch)
<http://logger.info|logger.info>(f"Simulating {total} project combinations")
return total
@task
def simulate_project(run_params, sim_time, total):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"Simulating ... ")
time.sleep(sim_time)
return {
"instance": prefect.context.map_index+1,
"total": total,
"finished_ts": datetime.now().isoformat()
}
@task
def store_output(result):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{result['instance']}/{result['total']} - {result['finished_ts']}")
with Flow(
"Scaling Test Flow"
) as flow:
run_batch = Parameter("run_batch", default=[1, 2, 3, 4])
sim_time = Parameter("sim_time", default=10)
total = pre_process_batch(run_batch)
results = simulate_project.map(run_batch, sim_time=unmapped(sim_time), total=unmapped(total))
store_output.map(results)
if __name__ == "__main__":
# This is a stripped down example of our deployment script.
# Our CI builds/pushes the docker image and then registers
# all flows in a loop into the test environment.
# Merge of a PR registers to dev, tags release to prod.
from pathlib import Path
flow.validate()
# Local storage (already built into docker image by CI process)
parts = Path(__file__).resolve().parts
path = str(Path("/src/") / Path(*parts[parts.index("workflows"):]))
flow.storage = Local(stored_as_script=True, path=path, add_default_labels=False)
# Different deployment environments
dev = {
"GCS_BUCKET": "dev-bucket",
"BQ_DATASET": "dev-dataset",
"BQ_TABLE": "results"
}
test = {
"GCS_BUCKET": "test-bucket",
"BQ_DATASET": "test-dataset",
"BQ_TABLE": "results"
}
##
# Deploy to test environment
##
# K8s run config
flow.run_config = KubernetesRun(
image_pull_policy="Always",
labels=['k8s', 'test-workflows'],
image="<http://us.gcr.io/project/workflows:feature-branch|us.gcr.io/project/workflows:feature-branch>",
cpu_request="1000m",
memory_request="2Gi",
env=test
)
# Executor (Dask pod template requires the env dict again)
flow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
name=slugify(flow.name),
pod_template=make_pod_spec(
image="<http://us.gcr.io/project/workflows:feature-branch|us.gcr.io/project/workflows:feature-branch>",
cpu_request="2000m",
memory_request="4Gi",
threads_per_worker=1,
env=test,
)
),
adapt_kwargs={
"minimum": 1,
"maximum": 4,
},
)
flow.register(
project_name="test-workflows",
set_schedule_active=False,
)
##
# Deploy to dev environment
##
# K8s run config
flow.run_config = KubernetesRun(
image_pull_policy="Always",
labels=['k8s', 'dev-workflows'],
image="<http://us.gcr.io/project/workflows:main|us.gcr.io/project/workflows:main>",
cpu_request="1000m",
memory_request="2Gi",
env=dev
)
# Executor (Dask pod template requires the env dict again)
flow.executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
name=slugify(flow.name),
pod_template=make_pod_spec(
image="<http://us.gcr.io/project/workflows:main|us.gcr.io/project/workflows:main>",
cpu_request="4000m",
memory_request="8Gi",
threads_per_worker=2,
env=dev,
)
),
adapt_kwargs={
"minimum": 1,
"maximum": 8,
},
)
flow.register(
project_name="dev-workflows",
set_schedule_active=False,
)
Kevin Kho
from prefect import Flow, task
from prefect.executors import LocalExecutor, LocalDaskExecutor
from prefect.run_configs import LocalRun
with Flow("a") as flow:
task(lambda x: x+ 1)(1)
executors = [LocalExecutor, LocalDaskExecutor, LocalExecutor]
flows = []
for i in list(range(3)):
with Flow("executor"+str(i)) as flow:
task(lambda x: x+ 1)(i)
flow.run_config = LocalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
flow.executor = executors[i]()
flow.register("dsdc")
flows.append(flow.copy())
del flow
x, y, z = flows
When I run y
, which is flow executor1
, I get the LocalDaskExecutor even if it was overriden by the loopKevin Kho
Brad I
09/03/2021, 8:25 PMrun_config
through to the DaskExecutor
and KubeCluster
. Then we would be able to load a more generic version of the executor like:
executor = DaskExecutor(
cluster_class=lambda: KubeCluster(
name=slugify(prefect.context.flow_name),
pod_template=make_pod_spec(
image=prefect.context.image,
cpu_request="4000m",
memory_request="8Gi",
threads_per_worker=2,
)
),
adapt_kwargs={
"minimum": 1,
"maximum": 8,
},
)
with Flow(
"Scaling Flow",
executor=executor
) as flow:
Brad I
09/03/2021, 8:26 PMKevin Kho
Kevin Kho
Marvin
09/03/2021, 8:30 PM