Got another quick question : It seems that running a method in a ThreadPool makes it loose access to the current context and specifically the Prefect logger. Is there any easy workaround this ?
🎉 1
Sylvain Hazard
10/08/2021, 2:50 PM
Here's a way to reproduce :
Copy code
from prefect import task, Flow
import prefect
import random
from multiprocessing.dummy import Pool as ThreadPool
@task
def random_number():
return random.randint(0, 100)
def plus_one_f(x):
prefect.context.get("logger").info(f"Plus One F : {x}")
return x+1
@task
def plus_one(x):
a = plus_one_f(x)
with ThreadPool(8) as pool:
results = pool.map(plus_one_f, [x])
with Flow('My Functional Flow') as flow:
r = random_number()
y = plus_one(x=r)
flow.run()
k
Kevin Kho
10/08/2021, 2:50 PM
Maybe you can use the LocalDaskExecutor instead which spins up a multiprocessing pool? Otherwise the only workaround I have seen is directly passing the prefect logger to that function
s
Sylvain Hazard
10/08/2021, 2:52 PM
Thanks for the tip, i'll try this out ! Figured I could pass the logger as a parameter but it felt quite unclean 😕
z
Zach Angell
10/08/2021, 2:59 PM
@Sylvain Hazard yeah unfortunately
prefect.context
needs to be threadsafe, so any threads you spawn yourself will need custom logic to pass the context.
As Kevin mentioned, Prefect's executors have logic to ensure context is passed around correctly to threads/processes
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.