Hi, I'd be interested in chatting to anyone who's ...
# prefect-community
r
Hi, I'd be interested in chatting to anyone who's used prefect in a case where they have large amounts of intermediary data (> 1GB -> 10GB+), that needs passing between tasks
z
Hi! I’ve just started out at Prefect so I haven’t used it in this case yet, but I was working with (non-prefect) tasks before this where I was passing 10-1000GB of data. This gets a little infrastructure specific, what are you running Prefect on (or intending to)?
r
HI @Zanie currently running on one very large AWS machine. whats your experience with the data moving? e.g how long is this taking?
z
Ah so for my work I needed to scale up and down a lot so my dream setup was Dask on Kubernetes with a high performance network filesystem. I think anytime you’re working with that much data persisting it to a file system is the way to go. At the scale I was working with there were ~100TB file systems so we used Amazon FSx/LUSTRE to get very high performance network enabled file systems. I’m not sure if that’d be as reasonable if you’re just working with ~10GB of data.
r
what speeds of read/write were you getting with FSx/LUSTRE? im using attached SSD's and getting about 1GB a sec (byte not bit)
z
LUSTRE uses a raid array of disks in its backend so it scales per disk (roughly 1TiB each), AWS promises 200 MB/s/TiB baseline, up to 1.3 GB/s/TiB burst
(for the fastest tier of SSD backed FSx — since it’s been rolled out they’ve added slower cheaper options)
r
yeah ok nice. im getting about the same speed on my SSD's so its not a major benefit to me
but i can see where it would come in if you have LOTS of data
z
It depends on how many TiB you have
r
^^^
yes - exactly
z
But yeah, 1 GB/s seems pretty good. If you’re not using a ton of disk and you need more speed you may want to look into high memory instances and pass what you can in memory.
r
yes - doing that, sadly we arent using prefects features really
so im trying to find if i've missed something
@Zanie so would you say that the use of prefect in my case, might actually get in the way? its not meant for this?
z
I think Prefect is intended to solve the orchestration of your tasks and lower the complexity of dealing with failure cases. It’s not necessarily going to solve problems that are intertwined with your execution environment / infrastructure because we can’t optimize for all cases.
👍 1
upvote 1
Can you clarify: • What you mean when you say then you aren’t using prefects features if data is passed in memory • How many tasks are running concurrently passing 10GB of data • What kind of data you’re working with, structured/dataframes/blobs?
a
Does all the data have to be loaded into memory? I typically stream records in and out of S3 processing one line at a time with a buffer of bytes being pulled from S3 at a time.
r
@Zanie thanks for that. • I'm not really using any of the horizontal scaling capablities, we are on one box using the DaskLocalExectuor. Also we don't use the retry mechanism, as a failure in our model means something is wrong. • I was running about 24 concurrent tasks which were each passing around lots of data. I was getting lots of warnings about agents and memory. • Im working with dataframes and numpy arrays
Totally understand prefect cannot solve all problems, just trying to evaluate what I've discovered and whether its a right fit for my problem
z
I think that Prefect will solve this issue by scaling horizontally. With a single machine, you’re scaling vertically and either will have to increase your filesystem speed or memory size to handle the types of jobs you’re talking about. Personally, I’d pass references between prefect tasks and use a tool designed for fast access to large data patterns to work on those references within each task. Since you’re using structured data (arrays/dataframes), you’re in luck! this is something that has been worked on a lot and there are many options, e.g. • https://docs.dask.org/en/latest/dataframe.htmlhttps://arrow.apache.org/docs/index.html
r
i'll look at those! what about the potential for the movement of the data offsetting the benefit from horizontal scaling?
z
That’s a good question! In that case, purely using Prefect “results” to dump data to disk from task to task could be quite slow and then depends on either a fast network file system or you’ll be limited by the speed of something like S3 which sounds too slow for your application. By itself, dask works hard to schedule tasks where the data is already living to reduce data movement but it would require using dask distributed objects and we don’t have first-class support for that within our result objects because it creates complex cases during flow rescheduling. I will admit that I don’t have a strong understanding of how you’d make that work within Prefect. Large-data support is something I’m interested in improving in the long run but there’s a lot of complexity there!
r
thanks @Zanie So my model generates its own data, monte carlo sims. So yes localised data in a traditional hadoop map/reduce style would work great since the data since near the processing but I'm the source of our data so I'm already near the data - if that makes sense. To move it elsewhere to then process will incur a cost.
z
That makes sense! It may be worth disabling checkpointing (see the arg at https://docs.prefect.io/api/latest/core/task.html#task-2) since it sounds like handling failure cases aren’t a priority and then managing the data how you see fit. I’m not sure I can do much more for you now though 🙂
r
thanks @Zanie!!