https://prefect.io logo
Title
r

Ryan Abernathey

09/09/2019, 6:26 PM
Thanks @Chris White for stopping by the Pangeo ML Working Group meeting today. I’ve got a couple of follow-up questions. Let me know if any of these should be escalated to GitHub issues. The main question is about
ResultHandler
objects. In Pangeo, our I/O stack is something like Google Cloud Storage <- GCSFS <- Zarr -< Xarray. I would like a Prefect task to write data to GCS. The normal way I would do this (without Prefect) is:
python
ds = # ... create xarray Dataset
gcfs_w_token = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
gcsmap = gcsfs.GCSMap(path, gcs=gcfs_w_token)
ds.to_zarr(gcsmap)
Obviously I can do that from within a Prefect task, but it kind of seems like I should be using a
ResultHandler
. Can you point me to any examples of custom handlers? (Bonus points if they show how to use secure credentials.) Thanks again for an awesome tool.
😁 2
c

Chris White

09/09/2019, 6:41 PM
Hey @Ryan Abernathey! Good question; at the end of the day, a result handler is simply an object with
read
/
write
methods that are inverses of each other (and it needs to be cloudpickle-able for running on dask). For example, here is our internal implementation of a GCS result handler: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/result_handlers/gcs_result_handler.py This implementation won’t be nearly as performant as using
gcfs
, but should convey the idea. This handler also uses “Prefect Secrets” --> when running locally, secrets are pulled from
prefect.context
, and can be set via environment variable (e.g.,
export PREFECT__CONTEXT__SECRETS="my-secret"
). If you need added security, you could use an encryption package for parsing the secret.
r

Ryan Abernathey

09/09/2019, 6:43 PM
This seems very useful. Thanks! How do I associate a result with a specific handler?
👍 1
c

Chris White

09/09/2019, 6:43 PM
To actually trigger this result handler call, you need to “checkpoint” your Task (Prefect has a bias against storing data unnecessarily, unless users opt-in). Two things are necessary to make checkpointing work: - tasks need to request checkpointing and set their result handler:
@task(checkpoint=True, result_handler=my_handler())
- the appropriate setting needs to be turned on via env var / config:
export PREFECT__FLOWS__CHECKPOINTING=true
during execution
Task result handlers can be specified via the
result_handler
keyword as above
r

Ryan Abernathey

09/09/2019, 6:47 PM
I’ll give this a try and report back. Thanks
c

Chris White

09/09/2019, 6:49 PM
yea anytime! I’m super excited to hear the Pangeo group’s feedback and possible work with you all to improve Prefect!
@Marvin archive “How can I create and set a custom result handler?”