Michelle Brochmann06/14/2022, 3:10 AM
object. I am doing operations on spark
and would like to pass a
object. If the
is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of
methods are overridden to use the spark
methods. 1. Is there any reason this wouldn’t work or wouldn’t be recommended? 2. Are there any best practices for doing something like this?
to create a
used by my
. I would need to use this
methods. I’m not sure how I would plumb that in to
(since it isn’t passed the
is), although I suppose I could access a
some other way.
Michelle Brochmann06/14/2022, 1:32 PM
to run spark. • Good info about not using Results except for disaster recovery - you saved me going down a bit of a rabbit hole! 🙂 • I do look forward to hearing if @Kevin Kho has any recommendations or pointers to resources around using Spark with Prefect!
for most the tasks that require a Spark DataFrame. It also doesn’t make sense to use a lot of tasks because the tasks will just add to the Spark execution graph until an action on the DataFrame is called. I would suggest you keep your code modular with functions, and just use functions inside tasks. I would recommend you not use the Prefect result interface here. Also note that
are paired with
and the default
, so if you want to write your own result, just pair it with a custom
that does nothing. The default cloudpickle will throw an error when it tries to pickle the Spark DataFrame
gets the current Spark session if there is one already. Not by design but it just works that way. So you can get the SparkSession inside the result interface (I think).
Michelle Brochmann06/14/2022, 3:31 PM
Michelle Brochmann06/14/2022, 4:08 PM
that does nothing” is the way to achieve what I want: (Option D in the attached PDF) So if Task A returns a DataFrame which Task B depends on, but also writes it to disk in the body of the task, and I set the serializer to return nothing, then I could check in Task B if the input DataFrame is (whatever it would be if it was deserialized from nothing - I guess I’d have to create the deserializer to return None or an empty DataFrame) and if that’s the case (like if I’m restarting the Flow from Task B), then I would read it from disk? Some of my colleagues really like the idea of subclassing the Result (Option C) though - so I’m trying to figure out if it’s just “gently not recommended” or if something really could blow up if we do it that way! 😛
. Just be mindful of how many times you are saving though (if it becomes a bottleneck for you), but I think you are aware. So with Pandas specifically, doing something like:
we find sometimes that the memory keeps increasing. The thing inside the task can still be held in memory so the preferred approach is:
@task def mydf(...): df = ... return df
gives you more memory management. I am not exactly sure about Spark dataframes since it’s lazy until the save anyway so it might work perfectly fine.
@task def mydf(...): location = ... df.to_parquet(location) del df return location
and it won’t be persisted. It will just pass through memory
also technically gives you “restart from failure” capabilities without having to use the Prefect
@task def mydf(...): location = ... spark_df.write.parquet(location) return location
Michelle Brochmann06/14/2022, 6:29 PM
. For example, you can copy the
here and make a Spark version. Ideally, you don’t want a new
type because Prefect won’t know how to retrieve it
Michelle Brochmann06/14/2022, 6:33 PM
. It’s the PandasSerializer that contains the same code. So it’s just pushing down to Spark commands I think the memory consumption will be the same as native Spark. For subclassing Result, it’s more like we store that information in our Database and we won’t know how to load something when you restart from failure.
Michelle Brochmann06/14/2022, 6:37 PM
Michelle Brochmann06/14/2022, 6:39 PM
Michelle Brochmann06/14/2022, 7:02 PM