Kevin Weiler
09/17/2021, 8:58 PMclass SetTrDate(Task):
def run(self, tr_date: str):
prefect.context.tr_date = tr_date or get_last_tr_date()
and then I read it in subsequent tasks with :
prefect.context.get('tr_date')
The subsequent tasks seem to think that it’s set to None
Should this work?Kevin Kho
Kevin Kho
Kevin Weiler
09/17/2021, 9:05 PMKevin Weiler
09/17/2021, 9:06 PMKevin Weiler
09/17/2021, 9:07 PMKevin Kho
os.environ
? An environment variable set there is temporary for the Pyrhon process. Especially if it’s smallKevin Weiler
09/17/2021, 9:08 PMKevin Kho
Kevin Weiler
09/17/2021, 9:10 PMLocalDaskExecutor(scheduler="threads")
or something right?Kevin Weiler
09/17/2021, 9:10 PMKevin Kho
flow.executor = LocalDaskExecutor()
so that your mapped tasks will already be parallelized so no need to create the multiprocessing pool. Local dask is a pool in itselfKevin Weiler
09/17/2021, 9:17 PMKevin Weiler
09/17/2021, 9:18 PMKevin Weiler
09/17/2021, 9:18 PMKevin Kho
Kevin Kho
Kevin Weiler
09/17/2021, 9:19 PMKevin Weiler
09/17/2021, 9:22 PMLocalDaskExecutor(scheduler="threads")
and does NOT work for
LocalDaskExecutor(scheduler="processes")
Kevin Weiler
09/17/2021, 9:22 PMthreads
is the default too - so I should probably just use that unless there is a reason not toKevin Kho
Kevin Kho
import prefect
from prefect import task, Flow, Parameter, case, unmapped
import os
from prefect.executors import LocalDaskExecutor
@task()
def set_environ(x):
os.environ["test"] = str(x)
return
@task()
def some_function(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(int(os.environ["test"]) + x)
return x
with Flow(name="Example Flow") as flow:
myparameter = Parameter("test", 123)
a = set_environ(myparameter)
some_function.map(x=[1,2,3,4], upstream_tasks=[unmapped(a)])
flow.executor = LocalDaskExecutor(scheduler="threads")
flow.run()
Kevin Weiler
09/17/2021, 9:36 PMKevin Weiler
09/17/2021, 9:36 PMKevin Weiler
09/17/2021, 9:37 PMrequests
Kevin Weiler
09/17/2021, 9:37 PM