https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Mike Nerone

07/30/2020, 4:39 PM
Greetings! I have an existing app that is in essence an ETL app, but it currently runs in one process. I now wish to scale it out, and I’m hoping to use Prefect to do that. My question is this, and please forgive me if this is a n00b question: the result set that ultimately comes out of the Extract is potentially far more than will fit in memory, so it’s currently implemented as an (async) generator that makes paginated requests so it can yield chunks to the Transform. It doesn’t seem as if this pattern is currently supported by Prefect, so can you give me any advice on “The Prefect Way” to handle a large-data problem like this? My first thought was that instead of being a generator, the Extract task could itself kick off a parametrized flow for each chunk of data, but that seems like a lot of overhead for potentially thousands (or even tens of thousands) of chunks (and I’m not positive a task can do that anyway). Is there perhaps some other mechanism I might use to stream data between longer-running tasks?
👀 1
Is the right answer to rely on something external? Like instead of an E->T->L flow, decouple the relationships so they can run in parallel, and communicate via message queues instead of using Prefect’s result-passing?
l

Laura Lorenz (she/her)

07/30/2020, 5:56 PM
Hi @Mike Nerone! In one sense I think it depends what you are actually `return`ing from your extract task once you move it into Prefect - for example right now you have it as the data itself (as a generator), but it could instead be pointers to the data in the form of a filename or row subset or something and the downstream transform task hydrates the data itself. I haven’t personally heard of people using this pattern but technically your result can be any Python object, including a generator, so you may be able to keep your code as is. If you do want to go down the slice-n-dice path, whether on the full result if you can get into a less memory-starved environment or on pointers to the result, you might be interested in mapping (https://docs.prefect.io/core/concepts/mapping.html#mapping) to generate many tasks programmatically for each chunk across the results of your extract (I also recommend the intro to this blog with some good visuals: https://medium.com/the-prefect-blog/map-faster-mapping-improvements-in-prefect-0-12-0-7cacc3f14e16).
m

Mike Nerone

07/30/2020, 6:12 PM
Thanks much, @Laura Lorenz (she/her) - I hadn't considered allowing the generator result itself to be passed downstream. But even if this works (the extract generator getting bundled up inside the transform generator, which the load iterates), my concern is that I haven't actually split up the work. The extract and transform did nothing but create their generators and exit almost immediately, while the load ends up being where all of the work is done, right?
l

Laura Lorenz (she/her)

07/30/2020, 6:17 PM
Ok gotcha ya, I see that your leading sentence indicated you purposefully want to scale it out now instead of keep the code as is! Definitely a core benefit behind Prefect is to be able to scale up to bigger distributed computation easily, so I think getting to a situation where your flow uses mapping to slice-n-dice your work into parallelisable pieces, and then configure to use a DaskExecutor against a dask cluster or DaskKubernetesEnvironment against a kubernetes cluster. You still will need to be able to load the result you are mapping over into memory, which is where you might get into the games mentioned about as to whether that result is the actual data or pointers to where the data is at that each of your little parallelized load tasks can operate against.
m

Mike Nerone

07/30/2020, 6:20 PM
I guess I can't map over a generator? :) To avoid fitting the whole result set into memory?
l

Laura Lorenz (she/her)

07/30/2020, 6:21 PM
Haha well I think you can, but I think it doesn’t solve your memory problems because when the mapped parent spawns its mapped children, I think it coerces it into a list 🙂
m

Mike Nerone

07/30/2020, 6:25 PM
Gotcha. Ok one more question for now then I think I'll be out of your hair (many apologies and much appreciated). You mentioned passing a filename. Am I correct in thinking this wouldn't be safe in distributed environments where the two tasks might be running on different hosts?
l

Laura Lorenz (she/her)

07/30/2020, 6:26 PM
Yes that’s correct; if it’s instead a key to a remote file store (like S3, Azure, GCP) that would be the ideal situation if you have stuff split into files and are running distributed.
m

Mike Nerone

07/30/2020, 6:37 PM
Ok, thanks so much for your time and thought, @Laura Lorenz (she/her)!
l

Laura Lorenz (she/her)

07/30/2020, 6:38 PM
No problem!! marvin
p

Pedro Machado

07/31/2020, 4:43 AM
Hi Mike. I have an ETL pipeline that reads a large file from a URL and streams it into s3. In my case, there is no need to transform the file on the fly but I could if needed using an iterator. I read several files like this in parallel using mapping. If you can parallelize the read process you could use a mapped task that returns the parameters for the paginated requests and your child task could read and process each chunk and save the output to cloud storage. Do you know the number of chunks in advance or do you have to iterate (get next page) until you finish?