Prior to prefect, I have used AWS Step Functions t...
# ask-community
b
Prior to prefect, I have used AWS Step Functions to manage pipelines of tasks. What is the comparable approach to Map, where I can take a list of inputs, and in blocks of N, send each item of N to a task (e.g. AWS Lambda function). I could loop over items one at a time, but I wasn't sure if there was a feature that I am overlooking that gave us control similar to Map?
n
hey @Brock! just to make sure i understand, are you saying you have some list of inputs N, and for all i in N you want to send i to a independent lambda function to do some work?
b
Hey Nate. A more concrete example is that I would grab a batch of database ids, say 1000. I would operate on a block of five at a time. Within the block, a single id would be sent to a lambda function to be processed: one of per lambda. Once that task completed another item would be grabbed, but as long as the number of records being processed at a time was <= 5. Step functions also has a setting that has a tolerance for errors so that if one entry failed, we could avoid the whole run from falling over.
n
hmm if Im understanding what you're saying, it sounds like maybe you could use a pattern like this, where instead
process_pokemon_batch
could call this and perhaps use some global concurrency limit to enforce this part > as long as the number of records being processed at a time was <= 5 though im not sure off the top what the easiest way to know "what lambdas are currently running"
b
I will take a look, it's a "state/task" that can be defined within the Step Function. Map <- the docs if you are curious.
I appreciate the references, will peep
@Nate, fwiw, I was able to get really really close to what I was looking for via the code snippet below
Copy code
@flow(task_runner=ThreadPoolTaskRunner(max_workers=5), log_prints=True)
def job():

    result = schema_setup()
    collect_result = collect()
    post_ids = collect_result.get("post_ids")
    print(f"Posts to process: {len(post_ids)}")

    if len(post_ids) > 0:
        print("starting map operation over the identifed posts")
        processed_posts = []
        for post_id in post_ids:
            processed_posts.append(ingest.submit({"post_id": post_id}))
        wait(processed_posts)
    else:
        print("No posts to complete, exiting now.")
In my case, the ingest task doesn't return anything (its doing data pipeline work in a Google Cloud Function). I borrowed everything from here in the docs. The retries on the task did what I wanted on the error front and seeing the tasks spawn as others completed is exactly what I wanted. My question: I am not really wrapping my head around wait. I know that I am submitting tasks to the threadpool, but not certain what
wait(processed_posts)
is really doing, as I borrowed that from the floors example. I get the idea I am submitting the job to the threadpool, but do I need to use the list and
wait(processed_posts
or can I just call
ingest.submit({"post_id": post_id})
and move on?
n
i go into these task methods

here

and fwiw, what you have above is the same as
Copy code
if len(post_ids) > 0:
        print("starting map operation over the identifed posts")
        processed_posts = ingest.map([{"post_id": pid} for pid in post_ids]).wait()
    else:
        print("No posts to complete, exiting now.")
but in general,
wait
or the
.wait()
on a future do the same thing, they just wait for the
PrefectFuture
(one or many) to resolve (i.e for the work to actually happen in the thread, in your case) > can I just call
ingest.submit({"post_id": post_id})
and move on? you can move on within the function scope, but if you exit the function scope where you submitted without resolving futures, then python will garbage collect those futures, as I also go over in the linked video
b
Oh awesome, I will give the video a look. As always, thanks for your help!
Unrelated, holy moly it is refreshing to see someone hop into an ipython terminal. hattip
n
catjam
i’m a big fan for quick scripting! it’s also good when trying to communicate/teach bc it’s hard to hide implicit context, “no sleight of hand!”
b
100%. Also in a zillion years never thought that your task or flow would just execute as straight python code within ipython. Your library is fantastic, and slowly piecing together how it all stitches together. Awesome stuff.
😊 1
n
awesome to hear, we try to stick as close to python as possible to avoid unpleasant surprises! appreciate the words - let me know if you have any other questions 👍