Zac Chien
09/16/2021, 7:56 AMZach Angell
Zac Chien
09/17/2021, 8:58 AMZac Chien
09/20/2021, 11:22 AMwit-labeling which accepts a list of label-name as input, also we have a python function consists of three steps(call boto3 S3, Athena function) for a single labeling logic. In wit-labeling, threading is used for speedup purpose, meanwhile we want to know the details for each of labeling step, including for debug purpose when there is any following job doesn’t work properly.
Now we are in development state and we found that we rely heavily on the log. 🥺Zach Angell
wit-labeling ?Jessica Smith
09/21/2021, 4:05 PMZach Angell
Jessica Smith
09/21/2021, 4:25 PMwith ThreadPool(20) as pool:
results = pool.map(run_extract_process,records)Jessica Smith
09/21/2021, 4:25 PM<http://logger.info|logger.info>(f"Starting extract for {v}")Jessica Smith
09/21/2021, 4:26 PMfrom multiprocessing.dummy import Pool as ThreadPoolJessica Smith
09/21/2021, 4:40 PM# if its not during a backend flow run, don't emit
if not context.get("running_with_backend"):
returnJessica Smith
09/21/2021, 4:43 PMJessica Smith
09/21/2021, 4:43 PMwith prefect.context({"running_with_backend": True}):
<http://logger.info|logger.info>(f"Starting extract for {v}")Zach Angell
prefect.context is threadsafe, so the context is lost when running in a thread poolZach Angell
prefect.context to run_extract_process as an arg and then re-initialize in run_extract_proces
def run_extract_process(record, prefect_context):
with prefect.context(prefect_context):
# ... everything elseJessica Smith
09/22/2021, 2:21 PMZach Angell
Jessica Smith
09/22/2021, 2:37 PM