https://prefect.io logo
Title
m

Michelle Brochmann

06/14/2022, 3:10 AM
Hi, I have a question about subclassing the
Result
object. I am doing operations on spark
DataFrames
in
Tasks
and would like to pass a
DataFrame
from
Task
to
Task
via the
Result
object. If the
DataFrame
is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of
Result
called
SparkDataFrameResult
where the
read
and
write
methods are overridden to use the spark
load
and
save
methods. 1. Is there any reason this wouldn’t work or wouldn’t be recommended? 2. Are there any best practices for doing something like this?
1
Additional info: I use
@resource_manager
to create a
SparkSession
used by my
Tasks
. I would need to use this
SparkSession
inside the
Result
read
and
write
methods. I’m not sure how I would plumb that in to
read
(since it isn’t passed the
**formatting_kwargs
like
write
is), although I suppose I could access a
SparkSession
some other way.
a

Anna Geller

06/14/2022, 11:24 AM
where is your spark cluster running? do you happen to use AWS EMR or Databricks?
since you asked about best practices recommendations, I'd say generally speaking you shouldn't use Results in your flow yourself - Results are more a disaster recovery mechanism - they are used by Prefect to help you when your flow fails, e.g. in order to provide restart functionality from the UI It would be better if you build some custom logic to persist your data, and passing data between tasks works out of the box with Prefect, but as you noticed, you need to ensure your execution layer has enough memory to do that
:gratitude-thank-you: 1
I'll let @Kevin Kho chime in once he starts later today - he's an expert in Spark/Dask/Dataframes so he may give you a better recommendation here
m

Michelle Brochmann

06/14/2022, 1:32 PM
• Right now we are just using
local[*]
to run spark. • Good info about not using Results except for disaster recovery - you saved me going down a bit of a rabbit hole! 🙂 • I do look forward to hearing if @Kevin Kho has any recommendations or pointers to resources around using Spark with Prefect!
k

Kevin Kho

06/14/2022, 1:59 PM
For Spark DataFrames with Prefect, there is a bit of awkwardness around it. For example, if you use results, it means you save a dataframe after every task, so you force execution and don’t take advantage of the lazy evaluation. I would turn do
checkpoint=False
for most the tasks that require a Spark DataFrame. It also doesn’t make sense to use a lot of tasks because the tasks will just add to the Spark execution graph until an action on the DataFrame is called. I would suggest you keep your code modular with functions, and just use functions inside tasks. I would recommend you not use the Prefect result interface here. Also note that
Results
are paired with
Serializers
and the default
Serializer
is a
PickleSerializer
, so if you want to write your own result, just pair it with a custom
Serializer
that does nothing. The default cloudpickle will throw an error when it tries to pickle the Spark DataFrame
I believe doing
SparkSession.builder.getOrCreate()
gets the current Spark session if there is one already. Not by design but it just works that way. So you can get the SparkSession inside the result interface (I think).
m

Michelle Brochmann

06/14/2022, 3:31 PM
Thanks @Kevin Kho, this is great context! @Anna Geller I want to make sure I understand what you mean by “I’d say generally speaking you shouldn’t use Results in your flow yourself” - do you just mean “don’t try to access the Results as Result objects from inside the flow”, or do you mean “Results are not intended to be used as Flow output”? (I.e. if I have a DataFrame that is the result of running my flow, is it better to use custom logic inside a Task to save that to an output location rather than have the DataFrame be the output of the final Task which is serialized to disk via a subclassed Result object?)
k

Kevin Kho

06/14/2022, 3:35 PM
I think she means the last part. Control the writing of the DataFrame yourself rather than relying on a Result to do it. The memory footprint of the Prefect flow is drastically lower if you do it.
:gratitude-thank-you: 1
m

Michelle Brochmann

06/14/2022, 4:08 PM
@Kevin Kho could you elaborate a bit more on the memory footprint aspect? Assuming that any DataFrame I pass between Tasks needs to be written, and I use the same mechanism to write it in my subclassed Result as I would outside (i.e. I’m not serializing the whole thing to memory at once) wouldn’t the overall memory footprint be about the same? One of the reasons I’m really liking the idea of a subclassed Result for a spark DataFrame is it would enable saving the DataFrame results to disk and allow disaster recovery without having to deserialize the DataFrame again if I need to use it in a downstream task! Is there another way to achieve this?
I thought a bit more and I think I see that your suggestion “just pair it with a custom
Serializer
that does nothing” is the way to achieve what I want: (Option D in the attached PDF) So if Task A returns a DataFrame which Task B depends on, but also writes it to disk in the body of the task, and I set the serializer to return nothing, then I could check in Task B if the input DataFrame is (whatever it would be if it was deserialized from nothing - I guess I’d have to create the deserializer to return None or an empty DataFrame) and if that’s the case (like if I’m restarting the Flow from Task B), then I would read it from disk? Some of my colleagues really like the idea of subclassing the Result (Option C) though - so I’m trying to figure out if it’s just “gently not recommended” or if something really could blow up if we do it that way! 😛
k

Kevin Kho

06/14/2022, 6:16 PM
The result can work for you for sure. Just use a serializer that does nothing and rely on
df.write.parquet()
. Just be mindful of how many times you are saving though (if it becomes a bottleneck for you), but I think you are aware. So with Pandas specifically, doing something like:
@task
def mydf(...):
    df = ...
    return df
we find sometimes that the memory keeps increasing. The thing inside the task can still be held in memory so the preferred approach is:
@task
def mydf(...):
    location = ...
    df.to_parquet(location)
    del df
    return location
gives you more memory management. I am not exactly sure about Spark dataframes since it’s lazy until the save anyway so it might work perfectly fine.
But it’s wrong to say all things need to be written. That is true if you want granular “restart from failure” but in general, you can turn it off too with
@task(checkpoint=False)
and it won’t be persisted. It will just pass through memory
I guess what I was saying also is that
@task
def mydf(...):
    location = ...
    spark_df.write.parquet(location)
    return location
also technically gives you “restart from failure” capabilities without having to use the Prefect
Result
interface
m

Michelle Brochmann

06/14/2022, 6:29 PM
Thanks @Kevin Kho!
k

Kevin Kho

06/14/2022, 6:29 PM
Ohh my gosh I typed so much and now understand the terminology confusion lol. Yes what you really want is like
S3Result
with
SparkSerializer
. For example, you can copy the
PandasSerializer
here and make a Spark version. Ideally, you don’t want a new
Result
type because Prefect won’t know how to retrieve it
You might find the Result interface restrictive. I’m not sure. You can’t really make a new Result class, but you can make a new Serializer class
m

Michelle Brochmann

06/14/2022, 6:33 PM
Hmm, the problem with the Serializer for us is that it requires the entire object to be held in memory - which (for the same reason we are using Spark and not pandas) might not be possible with our DataFrames - hence wanting to subclass the Result class (so I can control read/write as opposed to just serialize/deserialize). I thought subclassing the Result class would be possible - what are the pitfalls I should watch out for?
k

Kevin Kho

06/14/2022, 6:35 PM
The Serializer is the one that will contain the code for
df.write.parquet
. It’s the PandasSerializer that contains the same code. So it’s just pushing down to Spark commands I think the memory consumption will be the same as native Spark. For subclassing Result, it’s more like we store that information in our Database and we won’t know how to load something when you restart from failure.
m

Michelle Brochmann

06/14/2022, 6:37 PM
Hmm, that confuses me - I thought the Serializer converted from object to bytes and then the Result took care of writing it to the specified location!
k

Kevin Kho

06/14/2022, 6:39 PM
Oh I see, let me double check
k

Kevin Kho

06/14/2022, 6:42 PM
Look at this. It gets a
serialize_method
which is something like
to_csv
or
to_parquet
and then calls it inside the following
try
I don’t think the bytes buffer is used? It’s just there to return something for uniformity I think**
m

Michelle Brochmann

06/14/2022, 7:02 PM
I still think it’s the serialized bytes string returned from the serialize though - and then if for example this serializer were used with LocalResult, then here (LocalResult.write) is where that serialized bytes string would be retrieved and then written on line 118.
k

Kevin Kho

06/14/2022, 7:05 PM
Ah crud I understand what you are saying. Man this doesn’t seem friendly for Spark DataFrames then. I would just do it inside the task.
👍 1