Michelle Brochmann
06/14/2022, 3:10 AMResult
object. I am doing operations on spark DataFrames
in Tasks
and would like to pass a DataFrame
from Task
to Task
via the Result
object. If the DataFrame
is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of Result
called SparkDataFrameResult
where the read
and write
methods are overridden to use the spark load
and save
methods.
1. Is there any reason this wouldn’t work or wouldn’t be recommended?
2. Are there any best practices for doing something like this?@resource_manager
to create a SparkSession
used by my Tasks
. I would need to use this SparkSession
inside the Result
read
and write
methods.
I’m not sure how I would plumb that in to read
(since it isn’t passed the **formatting_kwargs
like write
is), although I suppose I could access a SparkSession
some other way.Anna Geller
06/14/2022, 11:24 AMMichelle Brochmann
06/14/2022, 1:32 PMlocal[*]
to run spark.
• Good info about not using Results except for disaster recovery - you saved me going down a bit of a rabbit hole! 🙂
• I do look forward to hearing if @Kevin Kho has any recommendations or pointers to resources around using Spark with Prefect!Kevin Kho
06/14/2022, 1:59 PMcheckpoint=False
for most the tasks that require a Spark DataFrame. It also doesn’t make sense to use a lot of tasks because the tasks will just add to the Spark execution graph until an action on the DataFrame is called. I would suggest you keep your code modular with functions, and just use functions inside tasks.
I would recommend you not use the Prefect result interface here. Also note that Results
are paired with Serializers
and the default Serializer
is a PickleSerializer
, so if you want to write your own result, just pair it with a custom Serializer
that does nothing. The default cloudpickle will throw an error when it tries to pickle the Spark DataFrameSparkSession.builder.getOrCreate()
gets the current Spark session if there is one already. Not by design but it just works that way. So you can get the SparkSession inside the result interface (I think).Michelle Brochmann
06/14/2022, 3:31 PMKevin Kho
06/14/2022, 3:35 PMMichelle Brochmann
06/14/2022, 4:08 PMSerializer
that does nothing” is the way to achieve what I want:
(Option D in the attached PDF)
So if Task A returns a DataFrame which Task B depends on, but also writes it to disk in the body of the task, and I set the serializer to return nothing, then I could check in Task B if the input DataFrame is (whatever it would be if it was deserialized from nothing - I guess I’d have to create the deserializer to return None or an empty DataFrame) and if that’s the case (like if I’m restarting the Flow from Task B), then I would read it from disk?
Some of my colleagues really like the idea of subclassing the Result (Option C) though - so I’m trying to figure out if it’s just “gently not recommended” or if something really could blow up if we do it that way! 😛Kevin Kho
06/14/2022, 6:16 PMdf.write.parquet()
. Just be mindful of how many times you are saving though (if it becomes a bottleneck for you), but I think you are aware.
So with Pandas specifically, doing something like:
@task
def mydf(...):
df = ...
return df
we find sometimes that the memory keeps increasing. The thing inside the task can still be held in memory so the preferred approach is:
@task
def mydf(...):
location = ...
df.to_parquet(location)
del df
return location
gives you more memory management.
I am not exactly sure about Spark dataframes since it’s lazy until the save anyway so it might work perfectly fine.@task(checkpoint=False)
and it won’t be persisted. It will just pass through memory@task
def mydf(...):
location = ...
spark_df.write.parquet(location)
return location
also technically gives you “restart from failure” capabilities without having to use the Prefect Result
interfaceMichelle Brochmann
06/14/2022, 6:29 PMKevin Kho
06/14/2022, 6:29 PMS3Result
with SparkSerializer
. For example, you can copy the PandasSerializer
here and make a Spark version. Ideally, you don’t want a new Result
type because Prefect won’t know how to retrieve itMichelle Brochmann
06/14/2022, 6:33 PMKevin Kho
06/14/2022, 6:35 PMdf.write.parquet
. It’s the PandasSerializer that contains the same code. So it’s just pushing down to Spark commands I think the memory consumption will be the same as native Spark.
For subclassing Result, it’s more like we store that information in our Database and we won’t know how to load something when you restart from failure.Michelle Brochmann
06/14/2022, 6:37 PMKevin Kho
06/14/2022, 6:39 PMMichelle Brochmann
06/14/2022, 6:39 PMKevin Kho
06/14/2022, 6:42 PMserialize_method
which is something like to_csv
or to_parquet
and then calls it inside the following try
Michelle Brochmann
06/14/2022, 7:02 PMKevin Kho
06/14/2022, 7:05 PM