Hello again from Chicago :wave: My question: &gt...
# prefect-community
Hello again from Chicago 👋 My question:
Is there any way to write a custom Storage class that extends one of the Prefect built-in Storage classes?
Background: I'm trying to extend prefect.environments.storage.S3 , to add some custom logic. I don't need to change anything in the interface, but I want to change the behavior of one or more methods in a my-use-case-specific way. Code samples and more context in thread.
👋 1
This toy example reproduces the issue I'm facing
Copy code
from prefect.serialization.storage import S3Schema
from prefect.environments.storage import S3
import os

class SomeCustomS3Storage(S3):
    def __init__(self, *args, **kwargs):
        print("hello from Chicago")
        super().__init__(*args, **kwargs)

import prefect
import os
from prefect import task, Flow
from prefect.environments import KubernetesJobEnvironment
from prefect.client import Client


def hello_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Cloud!")

flow = Flow("plase-work", tasks=[hello_task])

flow.storage = SomeCustomS3Storage(

env = KubernetesJobEnvironment(
        "image": "prefecthq/prefect:all_extras-0.12.0"

ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
Working through the Prefect code base, I think my issue is coming from the fact the fact that flow serialization / deserialization is based on class _name_: https://github.com/PrefectHQ/prefect/blob/d52cc24981a43ea12b7c6d10c6280aa24aa3efc0/src/prefect/serialization/storage.py#L105 Since I'm not changing the schema for
at all (just its methods), I tried this horrible thing just to see what would happen
Copy code
from prefect.serialization.storage import StorageSchema
from prefect.serialization.storage import S3Schema
    "SomeCustomS3Storage": S3Schema
This gets past the validation in my local call to
, but since it doesn't patch anything in the version of
running in Prefect Cloud, I get this
ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': "Invalid flow: {'storage': {'type': ['Unsupported value: SomeCustomS3Storage']}}", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I've looked through this Slack, the open issues on GitHub, and the docs and haven't found a discussion of sub-classing Storage like this. Am I trying to do something bad and unsupported?
ohhhhh ok I see now that
is just a dictionary representation of the flow, and that
needs to be able to recreate instances of the flow classes from that dictionary.
Copy code
import json
'{"name": "plase-work", "type": "prefect.core.flow.Flow", "schedule": null, "parameters": [], "tasks": [{"inputs": {}, "retry_delay": null, "cache_key": null, "outputs": "typing.Any", "tags": [], "name": "hello_task", "skip_on_upstream_skip": true, "cache_validator": {"fn": "prefect.engine.cache_validators.never_use", "kwargs": {}}, "timeout": null, "trigger": {"fn": "prefect.triggers.all_successful", "kwargs": {}}, "type": "prefect.tasks.core.function.FunctionTask", "max_retries": 0, "slug": "df28a210-ef8b-4551-b386-a32cd4e436c5", "cache_for": null, "auto_generated": false, "__version__": "0.12.0"}], "edges": [], "reference_tasks": [], "environment": {"labels": ["s3-flow-storage"], "executor_kwargs": {}, "metadata": {}, "executor": "prefect.engine.executors.LocalExecutor", "__version__": "0.12.0", "type": "RemoteEnvironment"}, "__version__": "0.12.0", "storage": {"bucket": "prefect-d94f436a-25b1-1699546c3", "key": null, "client_options": null, "secrets": [], "flows": {"plase-work": "plase-work/2020-06-19t16-53-47-549940-00-00"}, "__version__": "0.12.0", "type": "SomeCustomS3Storage"}}'
it’s not necessarily bad but unfortunately it’s not even possible -> because of the Hybrid model,
is the interface through which Prefect loads your custom Flow code. Before we’ve loaded your code though, we can’t use your custom code! (it’s a classic chicken and egg problem)
that means that
is the one thing that users must use officialy library versions of
Thanks! Ok yeah I think I talked myself into understanding how this works. I had the wrong mental model for what the serialized flow was / did, and that led me down a snowballing path of bad assumptions haha The thing I was trying to accomplish was basically a storage class where
builds the flow locally and pushes it over the wire to a REST API, and then where
hits that same REST API to read the flow back in. That way, flow runs wouldn't have to have AWS creds or even know about AWS, they would just use "some service they have access to" as the storage layer. But it looks like that isn't possible, and I imagine adding even a general-purpose version of something like that would be a big a lift. Is that idea worth documenting in a feature request? Or do you think it's too heavy?
I don’t think it would be too difficult to implement a generic “WebServer” storage type as you describe honestly; if you want to open a feature request for it we can discuss whether there are any gotchas!
ok great, thank you! I'll open that up in a bit.
for those finding this from Slack search, I've opened https://github.com/PrefectHQ/prefect/issues/2835
💯 1