https://prefect.io logo
Title
j

james.lamb

06/19/2020, 4:35 PM
Hello again from Chicago 👋 My question:
Is there any way to write a custom Storage class that extends one of the Prefect built-in Storage classes?
Background: I'm trying to extend prefect.environments.storage.S3 , to add some custom logic. I don't need to change anything in the interface, but I want to change the behavior of one or more methods in a my-use-case-specific way. Code samples and more context in thread.
👋 1
This toy example reproduces the issue I'm facing
from prefect.serialization.storage import S3Schema
from prefect.environments.storage import S3
import os

class SomeCustomS3Storage(S3):
    
    def __init__(self, *args, **kwargs):
        print("hello from Chicago")
        super().__init__(*args, **kwargs)

import prefect
import os
from prefect import task, Flow
from prefect.environments import KubernetesJobEnvironment
from prefect.client import Client

PROJECT_NAME="dask-iz-ok"
PREFECT_TEST_BUCKET=os.environ["PREFECT_TEST_BUCKET"]

@task
def hello_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Hello, Cloud!")

flow = Flow("plase-work", tasks=[hello_task])

flow.storage = SomeCustomS3Storage(
    bucket=PREFECT_TEST_BUCKET
)

env = KubernetesJobEnvironment(
    job_spec_file="prefect-flow-run.yaml",
    metadata={
        "image": "prefecthq/prefect:all_extras-0.12.0"
    }
)

flow.register(project_name=PROJECT_NAME)
ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
Working through the Prefect code base, I think my issue is coming from the fact the fact that flow serialization / deserialization is based on class _name_: https://github.com/PrefectHQ/prefect/blob/d52cc24981a43ea12b7c6d10c6280aa24aa3efc0/src/prefect/serialization/storage.py#L105 Since I'm not changing the schema for
S3
at all (just its methods), I tried this horrible thing just to see what would happen
from prefect.serialization.storage import StorageSchema
from prefect.serialization.storage import S3Schema
StorageSchema.type_schemas.update({
    "SomeCustomS3Storage": S3Schema
})
This gets past the validation in my local call to
flow.register()
, but since it doesn't patch anything in the version of
prefect
running in Prefect Cloud, I get this
ClientError
ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': "Invalid flow: {'storage': {'type': ['Unsupported value: SomeCustomS3Storage']}}", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I've looked through this Slack, the open issues on GitHub, and the docs and haven't found a discussion of sub-classing Storage like this. Am I trying to do something bad and unsupported?
ohhhhh ok I see now that
flow.serialize()
is just a dictionary representation of the flow, and that
prefect
needs to be able to recreate instances of the flow classes from that dictionary.
import json
json.dumps(flow.serialize())
'{"name": "plase-work", "type": "prefect.core.flow.Flow", "schedule": null, "parameters": [], "tasks": [{"inputs": {}, "retry_delay": null, "cache_key": null, "outputs": "typing.Any", "tags": [], "name": "hello_task", "skip_on_upstream_skip": true, "cache_validator": {"fn": "prefect.engine.cache_validators.never_use", "kwargs": {}}, "timeout": null, "trigger": {"fn": "prefect.triggers.all_successful", "kwargs": {}}, "type": "prefect.tasks.core.function.FunctionTask", "max_retries": 0, "slug": "df28a210-ef8b-4551-b386-a32cd4e436c5", "cache_for": null, "auto_generated": false, "__version__": "0.12.0"}], "edges": [], "reference_tasks": [], "environment": {"labels": ["s3-flow-storage"], "executor_kwargs": {}, "metadata": {}, "executor": "prefect.engine.executors.LocalExecutor", "__version__": "0.12.0", "type": "RemoteEnvironment"}, "__version__": "0.12.0", "storage": {"bucket": "prefect-d94f436a-25b1-1699546c3", "key": null, "client_options": null, "secrets": [], "flows": {"plase-work": "plase-work/2020-06-19t16-53-47-549940-00-00"}, "__version__": "0.12.0", "type": "SomeCustomS3Storage"}}'
c

Chris White

06/19/2020, 4:57 PM
it’s not necessarily bad but unfortunately it’s not even possible -> because of the Hybrid model,
Storage
is the interface through which Prefect loads your custom Flow code. Before we’ve loaded your code though, we can’t use your custom code! (it’s a classic chicken and egg problem)
that means that
Storage
is the one thing that users must use officialy library versions of
j

james.lamb

06/19/2020, 5:02 PM
Thanks! Ok yeah I think I talked myself into understanding how this works. I had the wrong mental model for what the serialized flow was / did, and that led me down a snowballing path of bad assumptions haha The thing I was trying to accomplish was basically a storage class where
build()
builds the flow locally and pushes it over the wire to a REST API, and then where
.get_flow()
hits that same REST API to read the flow back in. That way, flow runs wouldn't have to have AWS creds or even know about AWS, they would just use "some service they have access to" as the storage layer. But it looks like that isn't possible, and I imagine adding even a general-purpose version of something like that would be a big a lift. Is that idea worth documenting in a feature request? Or do you think it's too heavy?
c

Chris White

06/19/2020, 5:33 PM
I don’t think it would be too difficult to implement a generic “WebServer” storage type as you describe honestly; if you want to open a feature request for it we can discuss whether there are any gotchas!
j

james.lamb

06/19/2020, 6:03 PM
ok great, thank you! I'll open that up in a bit.
for those finding this from Slack search, I've opened https://github.com/PrefectHQ/prefect/issues/2835
💯 1