Thread
#prefect-community
    james.lamb

    james.lamb

    2 years ago
    Hello again from Chicago 👋 My question:
    Is there any way to write a custom Storage class that extends one of the Prefect built-in Storage classes?
    Background: I'm trying to extend prefect.environments.storage.S3 , to add some custom logic. I don't need to change anything in the interface, but I want to change the behavior of one or more methods in a my-use-case-specific way. Code samples and more context in thread.
    This toy example reproduces the issue I'm facing
    from prefect.serialization.storage import S3Schema
    from prefect.environments.storage import S3
    import os
    
    class SomeCustomS3Storage(S3):
        
        def __init__(self, *args, **kwargs):
            print("hello from Chicago")
            super().__init__(*args, **kwargs)
    
    import prefect
    import os
    from prefect import task, Flow
    from prefect.environments import KubernetesJobEnvironment
    from prefect.client import Client
    
    PROJECT_NAME="dask-iz-ok"
    PREFECT_TEST_BUCKET=os.environ["PREFECT_TEST_BUCKET"]
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud!")
    
    flow = Flow("plase-work", tasks=[hello_task])
    
    flow.storage = SomeCustomS3Storage(
        bucket=PREFECT_TEST_BUCKET
    )
    
    env = KubernetesJobEnvironment(
        job_spec_file="prefect-flow-run.yaml",
        metadata={
            "image": "prefecthq/prefect:all_extras-0.12.0"
        }
    )
    
    flow.register(project_name=PROJECT_NAME)
    ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')
    Working through the Prefect code base, I think my issue is coming from the fact the fact that flow serialization / deserialization is based on class name: https://github.com/PrefectHQ/prefect/blob/d52cc24981a43ea12b7c6d10c6280aa24aa3efc0/src/prefect/serialization/storage.py#L105 Since I'm not changing the schema for
    S3
    at all (just its methods), I tried this horrible thing just to see what would happen
    from prefect.serialization.storage import StorageSchema
    from prefect.serialization.storage import S3Schema
    StorageSchema.type_schemas.update({
        "SomeCustomS3Storage": S3Schema
    })
    This gets past the validation in my local call to
    flow.register()
    , but since it doesn't patch anything in the version of
    prefect
    running in Prefect Cloud, I get this
    ClientError
    ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': "Invalid flow: {'storage': {'type': ['Unsupported value: SomeCustomS3Storage']}}", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I've looked through this Slack, the open issues on GitHub, and the docs and haven't found a discussion of sub-classing Storage like this. Am I trying to do something bad and unsupported?
    ohhhhh ok I see now that
    flow.serialize()
    is just a dictionary representation of the flow, and that
    prefect
    needs to be able to recreate instances of the flow classes from that dictionary.
    import json
    json.dumps(flow.serialize())
    '{"name": "plase-work", "type": "prefect.core.flow.Flow", "schedule": null, "parameters": [], "tasks": [{"inputs": {}, "retry_delay": null, "cache_key": null, "outputs": "typing.Any", "tags": [], "name": "hello_task", "skip_on_upstream_skip": true, "cache_validator": {"fn": "prefect.engine.cache_validators.never_use", "kwargs": {}}, "timeout": null, "trigger": {"fn": "prefect.triggers.all_successful", "kwargs": {}}, "type": "prefect.tasks.core.function.FunctionTask", "max_retries": 0, "slug": "df28a210-ef8b-4551-b386-a32cd4e436c5", "cache_for": null, "auto_generated": false, "version": "0.12.0"}], "edges": [], "reference_tasks": [], "environment": {"labels": ["s3-flow-storage"], "executor_kwargs": {}, "metadata": {}, "executor": "prefect.engine.executors.LocalExecutor", "version": "0.12.0", "type": "RemoteEnvironment"}, "version": "0.12.0", "storage": {"bucket": "prefect-d94f436a-25b1-1699546c3", "key": null, "client_options": null, "secrets": [], "flows": {"plase-work": "plase-work/2020-06-19t16-53-47-549940-00-00"}, "version": "0.12.0", "type": "SomeCustomS3Storage"}}'
    Chris White

    Chris White

    2 years ago
    it’s not necessarily bad but unfortunately it’s not even possible -> because of the Hybrid model,
    Storage
    is the interface through which Prefect loads your custom Flow code. Before we’ve loaded your code though, we can’t use your custom code! (it’s a classic chicken and egg problem)
    that means that
    Storage
    is the one thing that users must use officialy library versions of
    james.lamb

    james.lamb

    2 years ago
    Thanks! Ok yeah I think I talked myself into understanding how this works. I had the wrong mental model for what the serialized flow was / did, and that led me down a snowballing path of bad assumptions haha The thing I was trying to accomplish was basically a storage class where
    build()
    builds the flow locally and pushes it over the wire to a REST API, and then where
    .get_flow()
    hits that same REST API to read the flow back in. That way, flow runs wouldn't have to have AWS creds or even know about AWS, they would just use "some service they have access to" as the storage layer. But it looks like that isn't possible, and I imagine adding even a general-purpose version of something like that would be a big a lift. Is that idea worth documenting in a feature request? Or do you think it's too heavy?
    Chris White

    Chris White

    2 years ago
    I don’t think it would be too difficult to implement a generic “WebServer” storage type as you describe honestly; if you want to open a feature request for it we can discuss whether there are any gotchas!
    james.lamb

    james.lamb

    2 years ago
    ok great, thank you! I'll open that up in a bit.
    for those finding this from Slack search, I've opened https://github.com/PrefectHQ/prefect/issues/2835