m

    Marwan Sarieddine

    1 year ago
    Hi folks, this is a question of passing a custom context to a prefect flow run. More specifically about passing a custom context with a nested dictionary.
    We have a flow that obtains its default context from environment variables defined in the run config see below for an example run config template’s environment variables:
    'containers': [
    	{
    		'env': [
       			{'name': 'PREFECT__CONTEXT__worker__memory_gb', 'value': '3.6'},
       			{'name': 'PREFECT__CONTEXT__worker__cpu', 'value': '0.75'},
       		]
       }
    ]
    we then make use of the context as such (in this case to dynamically create our KubeCluster:
    create_worker_pod_yaml(
    	memory_gb=prefect.context.worker.memory_gb
    	cpu=prefect.context.worker.cpu
    	)
    Sometimes we would like to adjust the
    worker.memory_gb
    for a flow_run so intuitively we would like to update our default context by passing in an updated context
    updated_context = {
    	"worker": {
    		"memory_gb": 1.0
    	}
    }
    client.create_flow_run(flow_id=flow_id_to_use, context=updated_context)
    Sadly this fails for two reasons: - the full worker dict will be overwritten, (i.e. the updated context dictionary is not merged smartly but the equivalent of a simple dictionary.update seems to be performed) - when the updated_context is deserialized it, the nested worker dictionary is not transformed to a Box Dict so even if we pass the full worker dictionary - i.e.
    updated_context = {
    	"worker": {
    		"memory_gb": 1.0
    		"cpu": 1.0
    	}
    }
    we would still get a failure:
    <Failed: "Unexpected error: AttributeError("'dict' object has no attribute 'memory_gb'")">
    To address this we employ the following workaround for now (pass the full worker dictionary and update our code so that it makes use of
    __getitem__
    - i.e so it looks like this:
    create_worker_pod_yaml(
    	memory_gb=prefect.context.worker["memory_gb"]
    	cpu=prefect.context.worker["cpu"]
    	)
    I am wondering if we should avoid making use of nested context dicts altogether (which is a bit of a bummer given it is nice to keep the context organized) or if the prefect context resolution could be improved (in case I understand it correctly)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Marwan Sarieddine , I think I understand. Of all the things you listed, it seems the full dictionary should at least work, but I'll confirm with the team
    It would help me if you have a small reproducible example though
    m

    Marwan Sarieddine

    1 year ago
    Hi @Kevin Kho thank you for responding about this late in the day and for going through my thread here. Sure I will try to narrow this down to a more reproducible example …
    it seems the full dictionary should at least work
    Yes I know this works but it is far from ideal for us
    Kevin Kho

    Kevin Kho

    1 year ago
    I see some users doing this with parameters? Why did you choose context versus parameters?
    m

    Marwan Sarieddine

    1 year ago
    Our usecase is to dynamically create the dask kubernetes cluster
    KubeCluster
    from our run config - i.e. this is at a stage before the flow is run when parameters are still not available
    Kevin Kho

    Kevin Kho

    1 year ago
    I see
    m

    Marwan Sarieddine

    1 year ago
    I know prefect already has the ability to merge dictionaries smartly when resolving the context - just seems in this particular case it fails to do so
    @Kevin Kho please see these two files to reproduce this issue, the runner and the flow (I had to split them like this so I can specify the env vars and use subprocess)
    Here is the code in case you don’t want to download the script files
    runner.py
    import pathlib
    import os
    import subprocess
    
    # I am mimicking what our run config is doing with env variables.
    os.environ["PREFECT__CONTEXT__a__b"] = "1"
    os.environ["PREFECT__CONTEXT__a__c"] = "1"
    
    CURR_DIR = pathlib.Path(__file__).parent
    subprocess.run(["python", str(CURR_DIR / "simple_flow.py")])
    
    try:
        subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1"])
    except:
        print("this failed because a.c got overwritten")
    
    try:
        subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--c", "1"])
    except:
        print("this failed because a.b got overwritten")
    
    subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1", "--c", "2"])
    simple_flow.py
    from prefect.core.flow import Flow
    from prefect.utilities.tasks import task
    import prefect
    import click
    
    
    @task
    def work_with_context():
        return prefect.context.a["b"] + prefect.context.a["c"]
    
    
    with Flow("simple-flow") as flow:
        out = work_with_context()
    
    
    @click.command(name="run")
    @click.option(
        "--b",
        type=int,
        default=None,
        help="the value of a - optional",
    )
    @click.option(
        "--c",
        type=int,
        default=None,
        help="the value of a - optional",
    )
    def run_flow(b: int = None, c: int = None):
        if b is None and c is None:
            # this will work with both __getitem__ and __getattr__
            # i.e. with both prefect.context.a["b"] and prefect.context.a.b
            flow.run()
        else:
            if b is not None and c is None:
                flow.run(context={"a": {"b": b}})  # this fails because a.c is overwritten
            elif c is not None and b is None:
                flow.run(context={"a": {"c": c}})  # this fails because a.b is overwritten
            else:
                # this works fine given we are use __getitem__
                # i.e. prefect.context.a["b"] instead of prefect.context.a.b
                flow.run(context={"a": {"b": b, "c": c}})
    
    
    if __name__ == "__main__":
        run_flow()
    apologies if it is still too much - but that’s as minimal as I could go …
    run
    python runner.py
    to see how this play out …
    in hindsight “overwritten” might be the wrong word to use, I think I mean “deleted”
    Kevin Kho

    Kevin Kho

    1 year ago
    No this helps a lot. I can take it and simplify/open an issue if need be 🙂
    m

    Marwan Sarieddine

    1 year ago
    awesome - thank you
    Kevin Kho

    Kevin Kho

    1 year ago
    I’ll be checking this out more thoroughly today
    m

    Marwan Sarieddine

    1 year ago
    thanks
    let me know if you require any additional input from my end
    Michael Adkins

    Michael Adkins

    1 year ago
    Hey @Marwan Sarieddine -- we can take a look at parsing nested dicts in the context into a box more consistently. I think
    Box
    usually handles this recursive conversion so I'm surprised it's not working as is, may be a simple fix.
    @Marvin open "Nested dicts from env are not cast to
    Box
    in prefect.context"
    Marvin

    Marvin

    1 year ago
    m

    Marwan Sarieddine

    1 year ago
    Hi @Michael Adkins - thanks for following up on this. I think a more consistent behavior when specifying a context using env variables or passing a python dictionary would be a nice improvement here
    It would also be nice to allow for “nested updates” of the context by performing a smart dictionary merge instead of a simple update
    Michael Adkins

    Michael Adkins

    1 year ago
    @Marvin open "Nested context dicts from env are not merged recursively"
    🙂
    Marvin

    Marvin

    1 year ago
    m

    Marwan Sarieddine

    1 year ago
    thank you!
    Michael Adkins

    Michael Adkins

    1 year ago
    After looking at the code, this is not how the context works at all. There is never a recursive merge and it is always a single-level
    DotDict
    , there is no casting of deeper data. This could have some pretty serious implications since the context is used widely internally.
    I'm not sure we'll be able to fulfill this request but we can look into it further.
    Just as a heads up, recursive merging here would be a breaking change for some people so I'm not sure we will be able to do this soon. I'd recommend using flat keys instead of a nested dict to get the behavior you want for now.
    m

    Marwan Sarieddine

    1 year ago
    I see - I suppose that makes sense from a dev experience perspective … thanks for looking into it