We're a bit behind on Prefect versions (0.13.19) b...
# ask-community
j
We're a bit behind on Prefect versions (0.13.19) because we've been using Environments (especially DaskCloudProviderEnvironment) so we're ramping up on Run Configs. With Run Configs can we: 1. Examine the parameters of a Flow run at the start of execution, calculate the number of Dask workers based on a parameter value, and create an ephemeral Dask cluster to run that Flow on? 2. Alternatively, associate a Run Config with a schedule, e.g. Run this Flow at midnight with 12 workers and run it again at 4am with 20 workers, etc.
k
Hey @Joe Schmid, I believe the answer is a bit tricky here, and we need to go over a couple of concepts. First, is that you can use the Dask cluster both as a Resource Manager or as an Executor. If you are using it as a resource manager, then it becomes easier. If you are using it as an executor, I think you need to create a function and do
Copy code
flow.executor = create_dask_executor()
Inside this function you can then
Copy code
def create_dask_executor():
    prefect.context.get("parameters")
    return spec
This is possible because the executor is not serialized with the flow and is loaded from the flow storage. Because it’s not in the flow context though, I don’t think you can directly pass parameters in. On number 2, the RunConfig does not take the executor. Clocks have parameters and schedules are comprised of multiple clocks. So you can attach your parameters and maybe change your executor that way, but it won’t be in the RunConfig.
Check this thread. I told them it couldn’t be done and I was wrong. They had a clever solution
If that function approach doesn’t work, I think you need to do this.
Actually I just tried the function approach and unfortunately, it doesn’t work because you don’t have the Parameters in the context when the executor is created yet (the Parameter is a task and tasks haven’t run). You need to follow the thread above.
j
@Kevin Kho Thanks! Wow, the workaround in that thread is interesting and I think it can probably work for us. (It's not quite the same but it might be close enough.) As feedback, we would definitely like to have a less hacky way to do what we currently do in DaskCloudProviderEnvironment to dynamically size a cluster based on parameters and/or schedules.
k
For sure will bring it up.
🙏 1
g
@Kevin Kho I can’t see the thread from @Martin with the solution. How can I read the thread? I have the same problem to solve.
I found it 🎉: https://github.com/PrefectHQ/prefect/issues/4942 I should understand now how I can modify the code to pass environment variable at runtime
👍 1