related to <this thread>, I’m trying to set a key ...
# ask-community
k
related to this thread, I’m trying to set a key in the context in one task, and then read it in a subsequent task. Is that something that should work? Here is the task that sets the key:
Copy code
class SetTrDate(Task):
    def run(self, tr_date: str):
        prefect.context.tr_date = tr_date or get_last_tr_date()
and then I read it in subsequent tasks with :
Copy code
prefect.context.get('tr_date')
The subsequent tasks seem to think that it’s set to
None
Should this work?
k
Hey @Kevin Weiler, no this should not work. The context is not really mutable once it’s set. Any edits to it don’t live outside of the task. If you need something across tasks but don’t want to pass it, maybe you can look into the KV Store? I think the best practice is explicitly passing it across tasks though for clear logic.
Some users add stuff to the context to retrieve it from State Handlers. This is also not particularly advised, but it’s quite hard to get data from a task into the state handler otherwise. This is more to do with the state handler not having a good design rather than a case to make the context mutable.
k
the problem with explicitly passing it to the tasks is that I’ll have a fairly complex graph where every single node needs this date string, and so the schematic is gonna be a mess. What I basically want is a configuration that is: 1. global to the flow 2. settable by a parameter 3. and if not set, determined at flow runtime (as opposed to deploy time)
KV Store is only available for Cloud I think
although - it’s not a bad idea - I could just set the string in the container in a file and read it out with each task
k
Oh yeah you’re right. You might be able to use
os.environ
? An environment variable set there is temporary for the Pyrhon process. Especially if it’s small
k
yeah - would need to use thread multiprocessing for that work, which is probably fine
k
Do you need multiprocessing specifically over using the LocalDaskExecutor?
k
no - just
LocalDaskExecutor(scheduler="threads")
or something right?
maybe I don’t understand how that works
k
Yep
Copy code
flow.executor = LocalDaskExecutor()
so that your mapped tasks will already be parallelized so no need to create the multiprocessing pool. Local dask is a pool in itself
k
right - but how do environment variables work across those
i’m going to try it, NBD if it doesn’t work - a file would be fine
thanks so much for helping me brainstorm
k
No problem!
Env variables are carried over I believe as long as you’re using Local
k
i’m going to reference these threads in a feature request too. I think having some global param (GlobalParameter maybe?) could be useful
also - tried it out, and FYI - this only works for
Copy code
LocalDaskExecutor(scheduler="threads")
and does NOT work for
Copy code
LocalDaskExecutor(scheduler="processes")
threads
is the default too - so I should probably just use that unless there is a reason not to
k
That makes sense I think. Was testing it myself just now. No specific reason except that threads can hang sometimes over larger maps and processes would be more stable.
Here is my test btw:
Copy code
import prefect
from prefect import task, Flow, Parameter, case, unmapped
import os
from prefect.executors import LocalDaskExecutor


@task()
def set_environ(x):
    os.environ["test"] = str(x)
    return

@task()
def some_function(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(int(os.environ["test"]) + x)
    return x

with Flow(name="Example Flow") as flow:
    myparameter = Parameter("test", 123)
    a = set_environ(myparameter)

    some_function.map(x=[1,2,3,4], upstream_tasks=[unmapped(a)])

flow.executor = LocalDaskExecutor(scheduler="threads")
    
flow.run()
🙏 1
k
yeah - in python - I usually always go with processes because of the GIL
This is a pretty lightweight DAG - all it does is issue API calls to Nomad (similar to k8s) and then poll periodically for state
basically just a bunch of
requests
I’m gonna go with it like this for now