Michelle Brochmann
06/14/2022, 3:10 AMResult object. I am doing operations on spark DataFrames in Tasks and would like to pass a DataFrame from Task to Task via the Result object. If the DataFrame is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of Result called SparkDataFrameResult where the read and write methods are overridden to use the spark load and save methods.
1. Is there any reason this wouldn’t work or wouldn’t be recommended?
2. Are there any best practices for doing something like this?Michelle Brochmann
06/14/2022, 3:41 AM@resource_manager to create a SparkSession used by my Tasks. I would need to use this SparkSession inside the Result read and write methods.
I’m not sure how I would plumb that in to read (since it isn’t passed the **formatting_kwargs like write is), although I suppose I could access a SparkSession some other way.Anna Geller
Anna Geller
Anna Geller
Michelle Brochmann
06/14/2022, 1:32 PMlocal[*] to run spark.
• Good info about not using Results except for disaster recovery - you saved me going down a bit of a rabbit hole! 🙂
• I do look forward to hearing if @Kevin Kho has any recommendations or pointers to resources around using Spark with Prefect!Kevin Kho
checkpoint=False for most the tasks that require a Spark DataFrame. It also doesn’t make sense to use a lot of tasks because the tasks will just add to the Spark execution graph until an action on the DataFrame is called. I would suggest you keep your code modular with functions, and just use functions inside tasks.
I would recommend you not use the Prefect result interface here. Also note that Results are paired with Serializers and the default Serializer is a PickleSerializer, so if you want to write your own result, just pair it with a custom Serializer that does nothing. The default cloudpickle will throw an error when it tries to pickle the Spark DataFrameKevin Kho
SparkSession.builder.getOrCreate() gets the current Spark session if there is one already. Not by design but it just works that way. So you can get the SparkSession inside the result interface (I think).Michelle Brochmann
06/14/2022, 3:31 PMKevin Kho
Michelle Brochmann
06/14/2022, 4:08 PMMichelle Brochmann
06/14/2022, 5:50 PMSerializer that does nothing” is the way to achieve what I want:
(Option D in the attached PDF)
So if Task A returns a DataFrame which Task B depends on, but also writes it to disk in the body of the task, and I set the serializer to return nothing, then I could check in Task B if the input DataFrame is (whatever it would be if it was deserialized from nothing - I guess I’d have to create the deserializer to return None or an empty DataFrame) and if that’s the case (like if I’m restarting the Flow from Task B), then I would read it from disk?
Some of my colleagues really like the idea of subclassing the Result (Option C) though - so I’m trying to figure out if it’s just “gently not recommended” or if something really could blow up if we do it that way! 😛Kevin Kho
df.write.parquet() . Just be mindful of how many times you are saving though (if it becomes a bottleneck for you), but I think you are aware.
So with Pandas specifically, doing something like:
@task
def mydf(...):
df = ...
return df
we find sometimes that the memory keeps increasing. The thing inside the task can still be held in memory so the preferred approach is:
@task
def mydf(...):
location = ...
df.to_parquet(location)
del df
return location
gives you more memory management.
I am not exactly sure about Spark dataframes since it’s lazy until the save anyway so it might work perfectly fine.Kevin Kho
@task(checkpoint=False)
and it won’t be persisted. It will just pass through memoryKevin Kho
@task
def mydf(...):
location = ...
spark_df.write.parquet(location)
return location
also technically gives you “restart from failure” capabilities without having to use the Prefect Result interfaceMichelle Brochmann
06/14/2022, 6:29 PMKevin Kho
S3Result with SparkSerializer. For example, you can copy the PandasSerializer here and make a Spark version. Ideally, you don’t want a new Result type because Prefect won’t know how to retrieve itKevin Kho
Michelle Brochmann
06/14/2022, 6:33 PMKevin Kho
df.write.parquet . It’s the PandasSerializer that contains the same code. So it’s just pushing down to Spark commands I think the memory consumption will be the same as native Spark.
For subclassing Result, it’s more like we store that information in our Database and we won’t know how to load something when you restart from failure.Michelle Brochmann
06/14/2022, 6:37 PMKevin Kho
Michelle Brochmann
06/14/2022, 6:39 PMKevin Kho
serialize_method which is something like to_csv or to_parquet and then calls it inside the following tryKevin Kho
Michelle Brochmann
06/14/2022, 7:02 PMKevin Kho