https://prefect.io logo
Title
t

Tom Forbes

05/27/2021, 11:28 AM
And I’m slightly confused about how Result’s are expected to interact with external libraries like Dask. I’d like to save my Dask dataframe to Parquet somewhere, depending on what result is configured. Each Task has a unique storage location which can be a local directory or a S3 prefix, so how would I get this inside my task? I’d like to do:
@task()
def save_results(dataframe):
    dataframe.save_parquet(UNIQUE_TASK_LOCATION)
    return UNIQUE_TASK_LOCATION
or somesuch. But results seem to be centred around handling the writing/pickling of data for you? Ideally I’d not like to care if it’s a S3 prefix (for production) or a local directory (for debugging).
k

Kevin Kho

05/27/2021, 1:59 PM
If I understand correctly, the goal here is to get a location and it could be S3/Local and save it in the appropriate place. Can
save_parquet
work with S3 through
s3fs
? If it does and this setup works for both s3 and local locations already, then I think you can use this setup without worrying about the
Results
interface. The
Results
interface is just there to simplify saving to certain locations, but you don’t have to use it if the code above works well already.
And yes, the
Result
is a bit explicitly tied to S3/Local. If you want to use the
Result
interface, you probably need some
Parameter
that tells you which
Result
class, and then use it in the task like
@task()
def save_results(dataframe):
    s3_result = S3Result(bucket='bucket_o_models')
    my_saved_model_result = s3_result.write(location=UNIQUE_TASK_LOCATION)
    return UNIQUE_TASK_LOCATION
t

Tom Forbes

05/27/2021, 2:04 PM
The problem is that it doesn’t work - I don’t know how to get a unique task location per task.
I can use the
task_run_id
to construct something, but relative to what?
The whole point of the result interface is that you don’t need to care (as much) about where your data is stored, letting you focus on how it’s operated on instead. Having to manually construct a unique data location per task, conditionally depending on if you’re running locally or in production, invalidates this whole thing and is impossible to ensure consistency across different flows.
z

Zanie

05/27/2021, 2:57 PM
Hey @Tom Forbes -- there are a few ways you can go with this so let's work through it together
Here's an example
import os

from prefect import Flow, task
from prefect.engine.results import LocalResult, GCSResult


# This will only work at runtime if using script-based storage
RESULT = (
    GCSResult(bucket="foo", location="{task_run_id}")
    if os.environ.get("PRODUCTION_RUN")
    else LocalResult(dir="/Users/mz/examples", location="test-result-{task_run_id}")
)


@task(result=RESULT)
def make_data():
    return [1, 2, 3, 4, 5]


with Flow("example-results") as flow:
    make_data()



if __name__ == "__main__":
    flow.run()
PREFECT__FLOWS__CHECKPOINTING=true python results.py
generates results with names templated with the unique task run id in my local folder
If I add
PRODUCTION_RUN=true
it fails because I don't have a "foo" bucket setup
You can also use results directly in your task rather than letting Prefect handle the checkpointing for you
import os

import prefect
from prefect import Flow, task
from prefect.engine.results import LocalResult, GCSResult


RESULT_NAME = "{task_run_id}"
RESULT = (
    GCSResult(bucket="foo", location=RESULT_NAME)
    if os.environ.get("PRODUCTION_RUN")
    else LocalResult(dir="/Users/mz/examples", location=RESULT_NAME)
)


@task()
def make_data():
    data = [1, 2, 3, 4, 5]
    location = RESULT.write(data, **prefect.context)
    return location


with Flow("example-results") as flow:
    make_data()


if __name__ == "__main__":
    flow.run()
You could move it into a function as well so you can use pickle-based storage and the type will get determined at runtime correctly
def write_result(data):
    result = (
        GCSResult(bucket="foo", location=RESULT_NAME)
        if os.environ.get("PRODUCTION_RUN")
        else LocalResult(dir="/Users/mz/examples", location=RESULT_NAME)
    )
    return result.write(data, **prefect.context)
t

Tom Forbes

05/27/2021, 3:12 PM
Ok, thank you, but there are a few things here that could be improved
firstly: you only get a
location
attribute set when you write something using the result interface, which doesn’t work with external libraries that might want to handle the writing themselves. you can of course create a serializer to do this, but this isn’t a great experience because you end up with this weird situation where you need to expose all of the options the underlying
save
method uses, and you loose out on any error handling/recovery. And conceptually all you want to do is allocate some form of unique, persistent directory/prefix/whatever and pass this into the library.
z

Zanie

05/27/2021, 3:17 PM
Sorry, I'm not following what you're looking for
t

Tom Forbes

05/27/2021, 3:17 PM
Secondly the need to configure a different result type per environment isn’t good. How would the local one work for multiple users editing runs? what’s the right directory to use?
~/.prefect/
doesn’t respect
XDG_HOME
, do we need to repeat this in every flow? why?
Sorry, I’m not following what you’re looking for
I have a library, in this case it’s
dask
, that can save a dataframe to a location. I want the result of the task to be that location.
Prefect has a nice interface for allocating a unique path for each task, but I can’t seem to use this here
z

Zanie

05/27/2021, 3:22 PM
Secondly the need to configure a different result type per environment isn’t good. How would the local one work for multiple users editing runs? what’s the right directory to use? 
~/.prefect/
 doesn’t respect 
XDG_HOME
, do we need to repeat this in every flow? why?
You can definitely just use
os.path.expanduser
instead of hard-coding it as I did for my example. Your flows can just use some shared code. At scale, users frequently write their flows then import them all in CI and apply consistent settings to each of them before registering.
Regarding your use case, it sounds like you just want a custom result type. Why don't you write a subclass?
Also, you can pass your own serializer to any result class as far as I know
So you don't have to rely on Prefect's serializer choice as you described
t

Tom Forbes

05/27/2021, 5:49 PM
Sorry, I was writing a reply but I got sucked into a long meeting!
Right, so yes we could write and internally distribute a specific serialiser just for dask dataframes, but it’s pointless overhead and a very leaky abstraction, and one we’d need to add for every case we encounter.
the interface that makes sense for this task is simply:
dask_dataframe.save_parquet(context.unique_output_directory)
return context.unique_output_directory
And yes, we could of course use
expanduser
but this misses the point: we shouldn’t have to care. Prefect does a great job about hiding this from you via it’s
Result
type but only in limited cases. I can’t see a technical reason why you can’t expose a way to write to the location of a result without using the result itself. The output of a given task would be the location of the data, much like the cloud-based PrefectResult.
But this is unusable without the unwieldily Result interface. Which works well for simple values but falls down with any interface that provides a more efficient way to write it’s values somewhere. Take for example the built in dataframe serializer: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/serializers.py#L196-L197
Pandas is more than capable of writing itself to a location in a given format. But instead we seem to write it to a BytesIO, consuming a bunch of extra memory, before funnelling that into a custom storage abstraction. It might be much more efficient to just call
<http://dataframe.to|dataframe.to>_XYZ("s3://….")
which would stream it into S3 (or potentially use a memory mapped file locally).
We could of course write a serializer to do this, but the abstraction needs to support every single option Dask gives you when writing to remote storage. Which smells like a leaky interface. And it’s also much harder to reason about what is actually happening here: the result of the task looks like a Dask dataframe but it’s actually a series of Parquet files somewhere that magically gets implicitly written and can be read back using a variety of other tools.
If this isn’t making sense, perhaps there’s a better place than slack where I can write down this and have a discussion about it? I can create an issue perhaps? Or is there a forum?
m

Marko Mušnjak

05/28/2021, 8:29 AM
I’ve been trying to figure out the logic around similar uses as well (coming from Luigi world). My best guess is that you would use whatever your regular method is for getting the parquet files, and then output a result that contains the location for any downstream consumers to use as they see appropriate.
t

Tom Forbes

05/28/2021, 11:15 AM
Yeah that’s exactly what we would like to do 💪
z

Zanie

05/28/2021, 2:30 PM
Sorry I've been slow to get back to this, lots to do.
I'll read through the issue.
t

Tom Forbes

05/28/2021, 4:09 PM
It’s ok! It’s more of a future design idea, I’m working around it now but it’s not nice. Happy to try to contribute some improvements here once we get more accustomed to the internals.
Did you have a chance to take a look @Zanie?
z

Zanie

06/15/2021, 3:11 PM
Yeah! Sorry working on a big release
We're thinking about rehauling results/caching this year so basically the answer is yes we can make this a lot better and we will definitely take your use-case into consideration