james.lamb
06/19/2020, 4:35 PMIs there any way to write a custom Storage class that extends one of the Prefect built-in Storage classes?Background: I'm trying to extend prefect.environments.storage.S3 , to add some custom logic. I don't need to change anything in the interface, but I want to change the behavior of one or more methods in a my-use-case-specific way. Code samples and more context in thread.
from prefect.serialization.storage import S3Schema
from prefect.environments.storage import S3
import os
class SomeCustomS3Storage(S3):
def __init__(self, *args, **kwargs):
print("hello from Chicago")
super().__init__(*args, **kwargs)
import prefect
import os
from prefect import task, Flow
from prefect.environments import KubernetesJobEnvironment
from prefect.client import Client
PROJECT_NAME="dask-iz-ok"
PREFECT_TEST_BUCKET=os.environ["PREFECT_TEST_BUCKET"]
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
flow = Flow("plase-work", tasks=[hello_task])
flow.storage = SomeCustomS3Storage(
bucket=PREFECT_TEST_BUCKET
)
env = KubernetesJobEnvironment(
job_spec_file="prefect-flow-run.yaml",
metadata={
"image": "prefecthq/prefect:all_extras-0.12.0"
}
)
flow.register(project_name=PROJECT_NAME)
ValueError: Flow could not be deserialized successfully. Error was: TypeError('not all arguments converted during string formatting')Working through the Prefect code base, I think my issue is coming from the fact the fact that flow serialization / deserialization is based on class _name_: https://github.com/PrefectHQ/prefect/blob/d52cc24981a43ea12b7c6d10c6280aa24aa3efc0/src/prefect/serialization/storage.py#L105 Since I'm not changing the schema for
S3
at all (just its methods), I tried this horrible thing just to see what would happen
from prefect.serialization.storage import StorageSchema
from prefect.serialization.storage import S3Schema
StorageSchema.type_schemas.update({
"SomeCustomS3Storage": S3Schema
})
This gets past the validation in my local call to flow.register()
, but since it doesn't patch anything in the version of prefect
running in Prefect Cloud, I get this ClientError
ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': "Invalid flow: {'storage': {'type': ['Unsupported value: SomeCustomS3Storage']}}", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
flow.serialize()
is just a dictionary representation of the flow, and that prefect
needs to be able to recreate instances of the flow classes from that dictionary.
import json
json.dumps(flow.serialize())
'{"name": "plase-work", "type": "prefect.core.flow.Flow", "schedule": null, "parameters": [], "tasks": [{"inputs": {}, "retry_delay": null, "cache_key": null, "outputs": "typing.Any", "tags": [], "name": "hello_task", "skip_on_upstream_skip": true, "cache_validator": {"fn": "prefect.engine.cache_validators.never_use", "kwargs": {}}, "timeout": null, "trigger": {"fn": "prefect.triggers.all_successful", "kwargs": {}}, "type": "prefect.tasks.core.function.FunctionTask", "max_retries": 0, "slug": "df28a210-ef8b-4551-b386-a32cd4e436c5", "cache_for": null, "auto_generated": false, "__version__": "0.12.0"}], "edges": [], "reference_tasks": [], "environment": {"labels": ["s3-flow-storage"], "executor_kwargs": {}, "metadata": {}, "executor": "prefect.engine.executors.LocalExecutor", "__version__": "0.12.0", "type": "RemoteEnvironment"}, "__version__": "0.12.0", "storage": {"bucket": "prefect-d94f436a-25b1-1699546c3", "key": null, "client_options": null, "secrets": [], "flows": {"plase-work": "plase-work/2020-06-19t16-53-47-549940-00-00"}, "__version__": "0.12.0", "type": "SomeCustomS3Storage"}}'
Chris White
06/19/2020, 4:57 PMStorage
is the interface through which Prefect loads your custom Flow code. Before we’ve loaded your code though, we can’t use your custom code! (it’s a classic chicken and egg problem)Storage
is the one thing that users must use officialy library versions ofjames.lamb
06/19/2020, 5:02 PMbuild()
builds the flow locally and pushes it over the wire to a REST API, and then where .get_flow()
hits that same REST API to read the flow back in.
That way, flow runs wouldn't have to have AWS creds or even know about AWS, they would just use "some service they have access to" as the storage layer.
But it looks like that isn't possible, and I imagine adding even a general-purpose version of something like that would be a big a lift. Is that idea worth documenting in a feature request? Or do you think it's too heavy?Chris White
06/19/2020, 5:33 PMjames.lamb
06/19/2020, 6:03 PM