r

    Ryan Abernathey

    3 years ago
    Thanks @Chris White for stopping by the Pangeo ML Working Group meeting today. I’ve got a couple of follow-up questions. Let me know if any of these should be escalated to GitHub issues. The main question is about
    ResultHandler
    objects. In Pangeo, our I/O stack is something like Google Cloud Storage <- GCSFS <- Zarr -< Xarray. I would like a Prefect task to write data to GCS. The normal way I would do this (without Prefect) is:
    python
    ds = # ... create xarray Dataset
    gcfs_w_token = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
    gcsmap = gcsfs.GCSMap(path, gcs=gcfs_w_token)
    ds.to_zarr(gcsmap)
    Obviously I can do that from within a Prefect task, but it kind of seems like I should be using a
    ResultHandler
    . Can you point me to any examples of custom handlers? (Bonus points if they show how to use secure credentials.) Thanks again for an awesome tool.
    Chris White

    Chris White

    3 years ago
    Hey @Ryan Abernathey! Good question; at the end of the day, a result handler is simply an object with
    read
    /
    write
    methods that are inverses of each other (and it needs to be cloudpickle-able for running on dask). For example, here is our internal implementation of a GCS result handler: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/result_handlers/gcs_result_handler.py This implementation won’t be nearly as performant as using
    gcfs
    , but should convey the idea. This handler also uses “Prefect Secrets” --> when running locally, secrets are pulled from
    prefect.context
    , and can be set via environment variable (e.g.,
    export PREFECT__CONTEXT__SECRETS="my-secret"
    ). If you need added security, you could use an encryption package for parsing the secret.
    r

    Ryan Abernathey

    3 years ago
    This seems very useful. Thanks! How do I associate a result with a specific handler?
    Chris White

    Chris White

    3 years ago
    To actually trigger this result handler call, you need to “checkpoint” your Task (Prefect has a bias against storing data unnecessarily, unless users opt-in). Two things are necessary to make checkpointing work:- tasks need to request checkpointing and set their result handler:
    @task(checkpoint=True, result_handler=my_handler())
    - the appropriate setting needs to be turned on via env var / config:
    export PREFECT__FLOWS__CHECKPOINTING=true
    during execution
    Task result handlers can be specified via the
    result_handler
    keyword as above
    r

    Ryan Abernathey

    3 years ago
    I’ll give this a try and report back. Thanks
    Chris White

    Chris White

    3 years ago
    yea anytime! I’m super excited to hear the Pangeo group’s feedback and possible work with you all to improve Prefect!
    @Marvin archive “How can I create and set a custom result handler?”
    Marvin

    Marvin

    3 years ago