Hi folks, this is a question of passing a custom c...
# ask-community
m
Hi folks, this is a question of passing a custom context to a prefect flow run. More specifically about passing a custom context with a nested dictionary.
We have a flow that obtains its default context from environment variables defined in the run config see below for an example run config template’s environment variables:
Copy code
'containers': [
	{
		'env': [
   			{'name': 'PREFECT__CONTEXT__worker__memory_gb', 'value': '3.6'},
   			{'name': 'PREFECT__CONTEXT__worker__cpu', 'value': '0.75'},
   		]
   }
]
we then make use of the context as such (in this case to dynamically create our `KubeCluster`:
Copy code
create_worker_pod_yaml(
	memory_gb=prefect.context.worker.memory_gb
	cpu=prefect.context.worker.cpu
	)
Sometimes we would like to adjust the
worker.memory_gb
for a flow_run so intuitively we would like to update our default context by passing in an updated context
Copy code
updated_context = {
	"worker": {
		"memory_gb": 1.0
	}
}
client.create_flow_run(flow_id=flow_id_to_use, context=updated_context)
Sadly this fails for two reasons: - the full worker dict will be overwritten, (i.e. the updated context dictionary is not merged smartly but the equivalent of a simple dictionary.update seems to be performed) - when the updated_context is deserialized it, the nested worker dictionary is not transformed to a Box Dict so even if we pass the full worker dictionary - i.e.
Copy code
updated_context = {
	"worker": {
		"memory_gb": 1.0
		"cpu": 1.0
	}
}
we would still get a failure:
Copy code
<Failed: "Unexpected error: AttributeError("'dict' object has no attribute 'memory_gb'")">
To address this we employ the following workaround for now (pass the full worker dictionary and update our code so that it makes use of
__getitem__
- i.e so it looks like this:
Copy code
create_worker_pod_yaml(
	memory_gb=prefect.context.worker["memory_gb"]
	cpu=prefect.context.worker["cpu"]
	)
I am wondering if we should avoid making use of nested context dicts altogether (which is a bit of a bummer given it is nice to keep the context organized) or if the prefect context resolution could be improved (in case I understand it correctly)
k
Hey @Marwan Sarieddine , I think I understand. Of all the things you listed, it seems the full dictionary should at least work, but I'll confirm with the team
It would help me if you have a small reproducible example though
m
Hi @Kevin Kho thank you for responding about this late in the day and for going through my thread here. Sure I will try to narrow this down to a more reproducible example …
it seems the full dictionary should at least work
Yes I know this works but it is far from ideal for us
k
I see some users doing this with parameters? Why did you choose context versus parameters?
m
Our usecase is to dynamically create the dask kubernetes cluster
KubeCluster
from our run config - i.e. this is at a stage before the flow is run when parameters are still not available
k
I see
m
I know prefect already has the ability to merge dictionaries smartly when resolving the context - just seems in this particular case it fails to do so
@Kevin Kho please see these two files to reproduce this issue, the runner and the flow (I had to split them like this so I can specify the env vars and use subprocess)
Here is the code in case you don’t want to download the script files
runner.py
Copy code
import pathlib
import os
import subprocess

# I am mimicking what our run config is doing with env variables.
os.environ["PREFECT__CONTEXT__a__b"] = "1"
os.environ["PREFECT__CONTEXT__a__c"] = "1"

CURR_DIR = pathlib.Path(__file__).parent
subprocess.run(["python", str(CURR_DIR / "simple_flow.py")])

try:
    subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1"])
except:
    print("this failed because a.c got overwritten")

try:
    subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--c", "1"])
except:
    print("this failed because a.b got overwritten")

subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1", "--c", "2"])
simple_flow.py
Copy code
from prefect.core.flow import Flow
from prefect.utilities.tasks import task
import prefect
import click


@task
def work_with_context():
    return prefect.context.a["b"] + prefect.context.a["c"]


with Flow("simple-flow") as flow:
    out = work_with_context()


@click.command(name="run")
@click.option(
    "--b",
    type=int,
    default=None,
    help="the value of a - optional",
)
@click.option(
    "--c",
    type=int,
    default=None,
    help="the value of a - optional",
)
def run_flow(b: int = None, c: int = None):
    if b is None and c is None:
        # this will work with both __getitem__ and __getattr__
        # i.e. with both prefect.context.a["b"] and prefect.context.a.b
        flow.run()
    else:
        if b is not None and c is None:
            flow.run(context={"a": {"b": b}})  # this fails because a.c is overwritten
        elif c is not None and b is None:
            flow.run(context={"a": {"c": c}})  # this fails because a.b is overwritten
        else:
            # this works fine given we are use __getitem__
            # i.e. prefect.context.a["b"] instead of prefect.context.a.b
            flow.run(context={"a": {"b": b, "c": c}})


if __name__ == "__main__":
    run_flow()
apologies if it is still too much - but that’s as minimal as I could go …
run
python runner.py
to see how this play out …
in hindsight “overwritten” might be the wrong word to use, I think I mean “deleted”
k
No this helps a lot. I can take it and simplify/open an issue if need be 🙂
m
awesome - thank you
k
I’ll be checking this out more thoroughly today
m
thanks
let me know if you require any additional input from my end
z
Hey @Marwan Sarieddine -- we can take a look at parsing nested dicts in the context into a box more consistently. I think
Box
usually handles this recursive conversion so I'm surprised it's not working as is, may be a simple fix.
@Marvin open "Nested dicts from env are not cast to
Box
in `prefect.context`"
m
Hi @Zanie - thanks for following up on this. I think a more consistent behavior when specifying a context using env variables or passing a python dictionary would be a nice improvement here
It would also be nice to allow for “nested updates” of the context by performing a smart dictionary merge instead of a simple update
z
@Marvin open "Nested context dicts from env are not merged recursively"
🙂
m
thank you!
z
After looking at the code, this is not how the context works at all. There is never a recursive merge and it is always a single-level
DotDict
, there is no casting of deeper data. This could have some pretty serious implications since the context is used widely internally.
I'm not sure we'll be able to fulfill this request but we can look into it further.
Just as a heads up, recursive merging here would be a breaking change for some people so I'm not sure we will be able to do this soon. I'd recommend using flat keys instead of a nested dict to get the behavior you want for now.
m
I see - I suppose that makes sense from a dev experience perspective … thanks for looking into it