Hi, I use python threading package in a Prefect task, the target functions works properly, but the c...
z
Hi, I use python threading package in a Prefect task, the target functions works properly, but the context logger inside the target function doesn’t show in UI Logs, is there any way to catch the log in Prefect?
z
Hey @Zac Chien - logging from within threads / subprocesses can be tricky. Could you share a bit more about how you're using the threading package?
z
@Zach Angell Thanks for you message! Today I am busy in weekly meetings, I will reply to you later! Have a good day! 😀
👍 1
Sorry for the late reply! We have a Prefect task named
wit-labeling
which accepts a
list of label-name
as input, also we have a python function consists of three steps(call boto3 S3, Athena function) for a single labeling logic. In
wit-labeling
, threading is used for speedup purpose, meanwhile we want to know the details for each of labeling step, including for debug purpose when there is any following job doesn’t work properly. Now we are in development state and we found that we rely heavily on the log. 🥺
z
Gotcha any chance you can share a minimal version of the threading code in
wit-labeling
?
j
FYI - I am running into this same kind of issue. I'm importing a pool from multiprocessing.dummy and using pool.map to execute multiple threads. None of the logs make it to the UI.
z
Sorry to hear that @Jessica Smith, would you share a minimal example of how you're using multiprocessing pool and logging?
j
This is how we're running the code in parallel
with ThreadPool(20) as pool:
        
results = pool.map(run_extract_process,records)
then in run_extract_process I'm just trying to capture the input variable, so I know which one is which.
<http://logger.info|logger.info>(f"Starting extract for {v}")
and again, ThreadPool is from multiprocessing.dummy
from multiprocessing.dummy import Pool as ThreadPool
the missing logs don't seem to make it past this line in logging.py
# if its not during a backend flow run, don't emit
        
if not context.get("running_with_backend"):
            
return
Sure enough, if I change my code to add a line changing the context, it shows up in the UI.
with prefect.context({"running_with_backend": True}):
<http://logger.info|logger.info>(f"Starting extract for {v}")
z
Ahhhh gotcha that makes a lot of sense.
prefect.context
is threadsafe, so the context is lost when running in a thread pool
I suspect you could pass
prefect.context
to
run_extract_process
as an arg and then re-initialize in
run_extract_proces
Copy code
def run_extract_process(record, prefect_context):
    with prefect.context(prefect_context):
        # ... everything else
j
I'll try that. To confirm, is this something that is working the way it is designed, and no changes will be made to allow multithreading without resorting to passing around the context?
z
Correct. If you use custom threading logic within a task, we have no way of passing the necessary context to your new threads. That will have to be done with custom code. When using Prefect executors to execute tasks using multithreading, Prefect will take care of this for you.
👍 1
j
Perfect, that is exactly what I needed to know. Thanks!
👍 1