Brock
11/16/2024, 8:05 PMNate
11/16/2024, 10:37 PMBrock
11/16/2024, 10:54 PMNate
11/16/2024, 11:00 PMprocess_pokemon_batch
could call this and perhaps use some global concurrency limit to enforce this part
> as long as the number of records being processed at a time was <= 5
though im not sure off the top what the easiest way to know "what lambdas are currently running"Brock
11/17/2024, 12:45 AMBrock
11/17/2024, 12:49 AMBrock
11/18/2024, 5:46 PM@flow(task_runner=ThreadPoolTaskRunner(max_workers=5), log_prints=True)
def job():
result = schema_setup()
collect_result = collect()
post_ids = collect_result.get("post_ids")
print(f"Posts to process: {len(post_ids)}")
if len(post_ids) > 0:
print("starting map operation over the identifed posts")
processed_posts = []
for post_id in post_ids:
processed_posts.append(ingest.submit({"post_id": post_id}))
wait(processed_posts)
else:
print("No posts to complete, exiting now.")
In my case, the ingest task doesn't return anything (its doing data pipeline work in a Google Cloud Function). I borrowed everything from here in the docs. The retries on the task did what I wanted on the error front and seeing the tasks spawn as others completed is exactly what I wanted.
My question: I am not really wrapping my head around wait. I know that I am submitting tasks to the threadpool, but not certain what wait(processed_posts)
is really doing, as I borrowed that from the floors example. I get the idea I am submitting the job to the threadpool, but do I need to use the list and wait(processed_posts
or can I just call ingest.submit({"post_id": post_id})
and move on?Nate
11/18/2024, 5:52 PMif len(post_ids) > 0:
print("starting map operation over the identifed posts")
processed_posts = ingest.map([{"post_id": pid} for pid in post_ids]).wait()
else:
print("No posts to complete, exiting now.")
but in general, wait
or the .wait()
on a future do the same thing, they just wait for the PrefectFuture
(one or many) to resolve (i.e for the work to actually happen in the thread, in your case)
> can I just call ingest.submit({"post_id": post_id})
and move on?
you can move on within the function scope, but if you exit the function scope where you submitted without resolving futures, then python will garbage collect those futures, as I also go over in the linked videoBrock
11/18/2024, 6:37 PMBrock
11/20/2024, 2:03 AMNate
11/20/2024, 2:04 AMNate
11/20/2024, 2:05 AMBrock
11/20/2024, 2:07 AMNate
11/20/2024, 2:16 AM