Marwan Sarieddine
05/20/2021, 9:28 PMMarwan Sarieddine
05/20/2021, 9:29 PM'containers': [
{
'env': [
{'name': 'PREFECT__CONTEXT__worker__memory_gb', 'value': '3.6'},
{'name': 'PREFECT__CONTEXT__worker__cpu', 'value': '0.75'},
]
}
]
we then make use of the context as such (in this case to dynamically create our `KubeCluster`:
create_worker_pod_yaml(
memory_gb=prefect.context.worker.memory_gb
cpu=prefect.context.worker.cpu
)
Marwan Sarieddine
05/20/2021, 9:30 PMworker.memory_gb
for a flow_run so intuitively we would like to update our default context by passing in an updated context
updated_context = {
"worker": {
"memory_gb": 1.0
}
}
client.create_flow_run(flow_id=flow_id_to_use, context=updated_context)
Marwan Sarieddine
05/20/2021, 9:31 PMupdated_context = {
"worker": {
"memory_gb": 1.0
"cpu": 1.0
}
}
we would still get a failure:
<Failed: "Unexpected error: AttributeError("'dict' object has no attribute 'memory_gb'")">
To address this we employ the following workaround for now (pass the full worker dictionary and update our code so that it makes use of __getitem__
- i.e so it looks like this:
create_worker_pod_yaml(
memory_gb=prefect.context.worker["memory_gb"]
cpu=prefect.context.worker["cpu"]
)
Marwan Sarieddine
05/20/2021, 9:31 PMKevin Kho
Kevin Kho
Marwan Sarieddine
05/20/2021, 11:21 PMMarwan Sarieddine
05/20/2021, 11:22 PMit seems the full dictionary should at least workYes I know this works but it is far from ideal for us
Kevin Kho
Marwan Sarieddine
05/20/2021, 11:25 PMKubeCluster
from our run config - i.e. this is at a stage before the flow is run when parameters are still not availableKevin Kho
Marwan Sarieddine
05/20/2021, 11:27 PMMarwan Sarieddine
05/21/2021, 12:08 AMMarwan Sarieddine
05/21/2021, 12:09 AMrunner.py
import pathlib
import os
import subprocess
# I am mimicking what our run config is doing with env variables.
os.environ["PREFECT__CONTEXT__a__b"] = "1"
os.environ["PREFECT__CONTEXT__a__c"] = "1"
CURR_DIR = pathlib.Path(__file__).parent
subprocess.run(["python", str(CURR_DIR / "simple_flow.py")])
try:
subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1"])
except:
print("this failed because a.c got overwritten")
try:
subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--c", "1"])
except:
print("this failed because a.b got overwritten")
subprocess.run(["python", str(CURR_DIR / "simple_flow.py"), "--b", "1", "--c", "2"])
Marwan Sarieddine
05/21/2021, 12:09 AMsimple_flow.py
from prefect.core.flow import Flow
from prefect.utilities.tasks import task
import prefect
import click
@task
def work_with_context():
return prefect.context.a["b"] + prefect.context.a["c"]
with Flow("simple-flow") as flow:
out = work_with_context()
@click.command(name="run")
@click.option(
"--b",
type=int,
default=None,
help="the value of a - optional",
)
@click.option(
"--c",
type=int,
default=None,
help="the value of a - optional",
)
def run_flow(b: int = None, c: int = None):
if b is None and c is None:
# this will work with both __getitem__ and __getattr__
# i.e. with both prefect.context.a["b"] and prefect.context.a.b
flow.run()
else:
if b is not None and c is None:
flow.run(context={"a": {"b": b}}) # this fails because a.c is overwritten
elif c is not None and b is None:
flow.run(context={"a": {"c": c}}) # this fails because a.b is overwritten
else:
# this works fine given we are use __getitem__
# i.e. prefect.context.a["b"] instead of prefect.context.a.b
flow.run(context={"a": {"b": b, "c": c}})
if __name__ == "__main__":
run_flow()
Marwan Sarieddine
05/21/2021, 12:09 AMMarwan Sarieddine
05/21/2021, 12:10 AMpython runner.py
to see how this play out …Marwan Sarieddine
05/21/2021, 12:13 AMKevin Kho
Marwan Sarieddine
05/21/2021, 12:36 AMKevin Kho
Marwan Sarieddine
05/24/2021, 2:01 PMMarwan Sarieddine
05/24/2021, 2:01 PMZanie
Box
usually handles this recursive conversion so I'm surprised it's not working as is, may be a simple fix.Zanie
Box
in `prefect.context`"Marvin
05/24/2021, 3:29 PMMarwan Sarieddine
05/24/2021, 3:34 PMMarwan Sarieddine
05/24/2021, 3:35 PMZanie
Zanie
Marvin
05/24/2021, 3:38 PMMarwan Sarieddine
05/24/2021, 3:42 PMZanie
DotDict
, there is no casting of deeper data. This could have some pretty serious implications since the context is used widely internally.Zanie
Zanie
Marwan Sarieddine
05/24/2021, 4:05 PM