Zac Chien
09/16/2021, 7:56 AMZach Angell
Zac Chien
09/17/2021, 8:58 AMZac Chien
09/20/2021, 11:22 AMwit-labeling
which accepts a list of label-name
as input, also we have a python function consists of three steps(call boto3 S3, Athena function) for a single labeling logic. In wit-labeling
, threading is used for speedup purpose, meanwhile we want to know the details for each of labeling step, including for debug purpose when there is any following job doesn’t work properly.
Now we are in development state and we found that we rely heavily on the log. 🥺Zach Angell
wit-labeling
?Jessica Smith
09/21/2021, 4:05 PMZach Angell
Jessica Smith
09/21/2021, 4:25 PMwith ThreadPool(20) as pool:
results = pool.map(run_extract_process,records)
Jessica Smith
09/21/2021, 4:25 PM<http://logger.info|logger.info>(f"Starting extract for {v}")
Jessica Smith
09/21/2021, 4:26 PMfrom multiprocessing.dummy import Pool as ThreadPool
Jessica Smith
09/21/2021, 4:40 PM# if its not during a backend flow run, don't emit
if not context.get("running_with_backend"):
return
Jessica Smith
09/21/2021, 4:43 PMJessica Smith
09/21/2021, 4:43 PMwith prefect.context({"running_with_backend": True}):
<http://logger.info|logger.info>(f"Starting extract for {v}")
Zach Angell
prefect.context
is threadsafe, so the context is lost when running in a thread poolZach Angell
prefect.context
to run_extract_process
as an arg and then re-initialize in run_extract_proces
def run_extract_process(record, prefect_context):
with prefect.context(prefect_context):
# ... everything else
Jessica Smith
09/22/2021, 2:21 PMZach Angell
Jessica Smith
09/22/2021, 2:37 PM