Follow up from my message above about the `DaskExe...
# ask-community
b
Follow up from my message above about the
DaskExecutor
not getting the env vars set in the run config, this has caused other issues in the way we have been deploying flows to different environments (test, dev, prod, etc). Previously, using the
LocalDaskExecutor
, we could set the executor on the flow itself and it was generic enough to work across all of our environments. Now we also need to change the dask config depending on our target environment, but it seems like setting the executor outside of the
with Flow()
definition just gets ignored. I believe this has something to do with the
storage
and
run_config
being serialized with the flow but the
executor
is not right now. An example flow and registration script is attached inside this thread. Does anyone know of a work around for this issue or have a better way to do this?
Copy code
import time
from datetime import datetime
from slugify import slugify
import prefect
from prefect import Parameter, Flow, task, unmapped
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Local
from dask_kubernetes import KubeCluster, make_pod_spec


@task
def pre_process_batch(run_batch):
    logger = prefect.context.get("logger")
    total = len(run_batch)
    <http://logger.info|logger.info>(f"Simulating {total} project combinations")
    return total


@task
def simulate_project(run_params, sim_time, total):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Simulating ... ")
    time.sleep(sim_time)
    return {
        "instance": prefect.context.map_index+1,
        "total": total,
        "finished_ts": datetime.now().isoformat()
    }


@task
def store_output(result):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"{result['instance']}/{result['total']} - {result['finished_ts']}")


with Flow(
    "Scaling Test Flow"
) as flow:
    run_batch = Parameter("run_batch", default=[1, 2, 3, 4])
    sim_time = Parameter("sim_time", default=10)
    total = pre_process_batch(run_batch)
    results = simulate_project.map(run_batch, sim_time=unmapped(sim_time), total=unmapped(total))
    store_output.map(results)


if __name__ == "__main__":
    # This is a stripped down example of our deployment script.
    # Our CI builds/pushes the docker image and then registers
    # all flows in a loop into the test environment.
    # Merge of a PR registers to dev, tags release to prod.
    from pathlib import Path

    flow.validate()

    # Local storage (already built into docker image by CI process)
    parts = Path(__file__).resolve().parts
    path = str(Path("/src/") / Path(*parts[parts.index("workflows"):]))
    flow.storage = Local(stored_as_script=True, path=path, add_default_labels=False)

    # Different deployment environments
    dev = {
        "GCS_BUCKET": "dev-bucket",
        "BQ_DATASET": "dev-dataset",
        "BQ_TABLE": "results"
    }

    test = {
        "GCS_BUCKET": "test-bucket",
        "BQ_DATASET": "test-dataset",
        "BQ_TABLE": "results"
    }

    ##
    # Deploy to test environment
    ##

    # K8s run config
    flow.run_config = KubernetesRun(
        image_pull_policy="Always",
        labels=['k8s', 'test-workflows'],
        image="<http://us.gcr.io/project/workflows:feature-branch|us.gcr.io/project/workflows:feature-branch>",
        cpu_request="1000m",
        memory_request="2Gi",
        env=test
    )

    # Executor (Dask pod template requires the env dict again)
    flow.executor = DaskExecutor(
            cluster_class=lambda: KubeCluster(
                name=slugify(flow.name),
                pod_template=make_pod_spec(
                    image="<http://us.gcr.io/project/workflows:feature-branch|us.gcr.io/project/workflows:feature-branch>",
                    cpu_request="2000m",
                    memory_request="4Gi",
                    threads_per_worker=1,
                    env=test,
                )
            ),
            adapt_kwargs={
                "minimum": 1,
                "maximum": 4,
            },
        )

    flow.register(
        project_name="test-workflows",
        set_schedule_active=False,
    )

    ##
    # Deploy to dev environment
    ##

    # K8s run config
    flow.run_config = KubernetesRun(
        image_pull_policy="Always",
        labels=['k8s', 'dev-workflows'],
        image="<http://us.gcr.io/project/workflows:main|us.gcr.io/project/workflows:main>",
        cpu_request="1000m",
        memory_request="2Gi",
        env=dev
    )

    # Executor (Dask pod template requires the env dict again)
    flow.executor = DaskExecutor(
        cluster_class=lambda: KubeCluster(
            name=slugify(flow.name),
            pod_template=make_pod_spec(
                image="<http://us.gcr.io/project/workflows:main|us.gcr.io/project/workflows:main>",
                cpu_request="4000m",
                memory_request="8Gi",
                threads_per_worker=2,
                env=dev,
            )
        ),
        adapt_kwargs={
            "minimum": 1,
            "maximum": 8,
        },
    )

    flow.register(
        project_name="dev-workflows",
        set_schedule_active=False,
    )
k
Your thinking is definitely right. The executor is not serialized and is kept in storage. When the flow gets loaded, it goes in and looks for the executor. I think it still might be doable In your loop though. I think the thing here is that the flow and executor must be kept in memory. Look at this example:
Copy code
from prefect import Flow, task
from prefect.executors import LocalExecutor, LocalDaskExecutor
from prefect.run_configs import LocalRun

with Flow("a") as flow:
    task(lambda x: x+ 1)(1)

executors = [LocalExecutor, LocalDaskExecutor, LocalExecutor]

flows = []
for i in list(range(3)):
    with Flow("executor"+str(i)) as flow:
        task(lambda x: x+ 1)(i)
    flow.run_config = LocalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
    flow.executor = executors[i]()
    flow.register("dsdc")
    flows.append(flow.copy())

del flow
x, y, z = flows
When I run
y
, which is flow
executor1
, I get the LocalDaskExecutor even if it was overriden by the loop
Does that help?
b
It may help us come up with a work around that can load different executors and groups of env vars, but it is pretty awkward since each flow needs to know all of the different places it will be deployed to up front. We were hoping that the flows would be able to define their needed scaling params (cpu/mem/min/max workers) and our CI could separately register the same flow into different environments. It would really help if the prefect agent was able to pass the env vars in the
run_config
through to the
DaskExecutor
and
KubeCluster
. Then we would be able to load a more generic version of the executor like:
Copy code
executor = DaskExecutor(
        cluster_class=lambda: KubeCluster(
            name=slugify(prefect.context.flow_name),
            pod_template=make_pod_spec(
                image=prefect.context.image,
                cpu_request="4000m",
                memory_request="8Gi",
                threads_per_worker=2,
            )
        ),
        adapt_kwargs={
            "minimum": 1,
            "maximum": 8,
        },
    )

 with Flow(
    "Scaling Flow",
    executor=executor
) as flow:
Let me know if that makes sense and is something you’re looking to enhance. Thanks @Kevin Kho!
k
So I think we can’t pass env vars to the executor through the RunConfig necessarily because a lot of people use Dask clusters that already exist. I do think we have discussions around serializing the executor along with the Flow, but not entirely sure. I’d have to check with the team on this.
👍 1
@Marvin open “Registering multiple Flows in same script makes it hard to attach different Executors”
m