https://prefect.io logo
Title
m

Matt Delacour

11/29/2022, 3:34 PM
👋 How can I "batch" my pipeline? I would like to run the full pipeline in batches of ~10k rather than having massive updates where one failure would stop everything. And so I would like to have batches of get_user_ids() run everything in parallel (LocalDaskExecutor), save the results in Redshift, and start again. PS: Another approach would be to build and "incremental" logic from the source data PS2: I am using Prefect 1.0
Unsure if Dynamic DAGs: Task Looping won't over complicate things here 🤔
s

Serina

11/29/2022, 7:13 PM
Off the top of my head, your
get_user_ids
task executes some SQL query, maybe you could edit it to retrieve 10k user IDs where some flag is set to false in the table, then you could have a final task in your flow that only runs if save_to_redshift task competes successfully, perhaps updating the original table to set the flag for those user IDs to true. And provided you have some schedule as well, maybe a statement in the flow like
while get_user_ids()
returns some user IDs, execute the following tasks.
k

Kalise Richmond

11/29/2022, 7:48 PM
Hey Matt, are you looking to do something where you chunk the
get_user_ids()
in 10k, then run the transformation
get_third_party_api_info()
over those chunks and finally once all those the chunks have run, upload the entire result to Redshift or save each chunk result to redshift?
m

Matt Delacour

11/29/2022, 9:19 PM
Yes you got it @Kalise Richmond and then the flow starts again with the next 10k