Sylvain Hazard
10/08/2021, 2:49 PMfrom prefect import task, Flow
import prefect
import random
from multiprocessing.dummy import Pool as ThreadPool
@task
def random_number():
return random.randint(0, 100)
def plus_one_f(x):
prefect.context.get("logger").info(f"Plus One F : {x}")
return x+1
@task
def plus_one(x):
a = plus_one_f(x)
with ThreadPool(8) as pool:
results = pool.map(plus_one_f, [x])
with Flow('My Functional Flow') as flow:
r = random_number()
y = plus_one(x=r)
flow.run()
Kevin Kho
10/08/2021, 2:50 PMSylvain Hazard
10/08/2021, 2:52 PMZach Angell
10/08/2021, 2:59 PMprefect.context
needs to be threadsafe, so any threads you spawn yourself will need custom logic to pass the context.
As Kevin mentioned, Prefect's executors have logic to ensure context is passed around correctly to threads/processes