Tom Forbes
05/27/2021, 11:28 AM@task()
def save_results(dataframe):
dataframe.save_parquet(UNIQUE_TASK_LOCATION)
return UNIQUE_TASK_LOCATION
or somesuch. But results seem to be centred around handling the writing/pickling of data for you? Ideally I’d not like to care if it’s a S3 prefix (for production) or a local directory (for debugging).Kevin Kho
05/27/2021, 1:59 PMsave_parquet
work with S3 through s3fs
? If it does and this setup works for both s3 and local locations already, then I think you can use this setup without worrying about the Results
interface. The Results
interface is just there to simplify saving to certain locations, but you don’t have to use it if the code above works well already.Result
is a bit explicitly tied to S3/Local. If you want to use the Result
interface, you probably need some Parameter
that tells you which Result
class, and then use it in the task like
@task()
def save_results(dataframe):
s3_result = S3Result(bucket='bucket_o_models')
my_saved_model_result = s3_result.write(location=UNIQUE_TASK_LOCATION)
return UNIQUE_TASK_LOCATION
Tom Forbes
05/27/2021, 2:04 PMtask_run_id
to construct something, but relative to what?Zanie
05/27/2021, 2:57 PMimport os
from prefect import Flow, task
from prefect.engine.results import LocalResult, GCSResult
# This will only work at runtime if using script-based storage
RESULT = (
GCSResult(bucket="foo", location="{task_run_id}")
if os.environ.get("PRODUCTION_RUN")
else LocalResult(dir="/Users/mz/examples", location="test-result-{task_run_id}")
)
@task(result=RESULT)
def make_data():
return [1, 2, 3, 4, 5]
with Flow("example-results") as flow:
make_data()
if __name__ == "__main__":
flow.run()
PREFECT__FLOWS__CHECKPOINTING=true python results.py
generates results with names templated with the unique task run id in my local folderPRODUCTION_RUN=true
it fails because I don't have a "foo" bucket setupimport os
import prefect
from prefect import Flow, task
from prefect.engine.results import LocalResult, GCSResult
RESULT_NAME = "{task_run_id}"
RESULT = (
GCSResult(bucket="foo", location=RESULT_NAME)
if os.environ.get("PRODUCTION_RUN")
else LocalResult(dir="/Users/mz/examples", location=RESULT_NAME)
)
@task()
def make_data():
data = [1, 2, 3, 4, 5]
location = RESULT.write(data, **prefect.context)
return location
with Flow("example-results") as flow:
make_data()
if __name__ == "__main__":
flow.run()
def write_result(data):
result = (
GCSResult(bucket="foo", location=RESULT_NAME)
if os.environ.get("PRODUCTION_RUN")
else LocalResult(dir="/Users/mz/examples", location=RESULT_NAME)
)
return result.write(data, **prefect.context)
Tom Forbes
05/27/2021, 3:12 PMlocation
attribute set when you write something using the result interface, which doesn’t work with external libraries that might want to handle the writing themselves.
you can of course create a serializer to do this, but this isn’t a great experience because you end up with this weird situation where you need to expose all of the options the underlying save
method uses, and you loose out on any error handling/recovery.
And conceptually all you want to do is allocate some form of unique, persistent directory/prefix/whatever and pass this into the library.Zanie
05/27/2021, 3:17 PMTom Forbes
05/27/2021, 3:17 PM~/.prefect/
doesn’t respect XDG_HOME
, do we need to repeat this in every flow? why?Sorry, I’m not following what you’re looking forI have a library, in this case it’s
dask
, that can save a dataframe to a location. I want the result of the task to be that location.Zanie
05/27/2021, 3:22 PMSecondly the need to configure a different result type per environment isn’t good. How would the local one work for multiple users editing runs? what’s the right directory to use?You can definitely just usedoesn’t respect~/.prefect/
, do we need to repeat this in every flow? why?XDG_HOME
os.path.expanduser
instead of hard-coding it as I did for my example. Your flows can just use some shared code. At scale, users frequently write their flows then import them all in CI and apply consistent settings to each of them before registering.Tom Forbes
05/27/2021, 5:49 PMdask_dataframe.save_parquet(context.unique_output_directory)
return context.unique_output_directory
expanduser
but this misses the point: we shouldn’t have to care. Prefect does a great job about hiding this from you via it’s Result
type but only in limited cases.
I can’t see a technical reason why you can’t expose a way to write to the location of a result without using the result itself. The output of a given task would be the location of the data, much like the cloud-based PrefectResult.<http://dataframe.to|dataframe.to>_XYZ("s3://….")
which would stream it into S3 (or potentially use a memory mapped file locally).Marko Mušnjak
05/28/2021, 8:29 AMTom Forbes
05/28/2021, 11:15 AMZanie
05/28/2021, 2:30 PMTom Forbes
05/28/2021, 4:09 PMZanie
06/15/2021, 3:11 PM