I am attempting to create a custom storage block (...
# prefect-community
r
I am attempting to create a custom storage block (Digital Ocean uses an
https://
prefix but
s3://
is hardcoded in the S3 filesystem object). I did this by copying the S3 storage object code and removing the hardcoded prefix but leaving everything else the same (except name and credential arguments). The block successfully registers to prefect cloud. However when I try to attach it as the storage block I get the error in the comments. From what I can tell this is happening during the validation step. At some point prefect is looking at the registry for the
Block
type via:
Copy code
from prefect.utilities.dispatch import lookup_type, get_registry_for_type
from prefect.blocks.core import Block
registry = get_registry_for_type(Block)
print(registry)
which returns all the prefect defined blocks. If I try to build a deployment it will fail because the custom filesystem class I created does not show up in that registry. Currently the remote file system object stores all ‘settings’ as plain text which isn’t ideal because I’m storing access credentials. Thoughts on the best way to proceed?
1
stack trace:
Copy code
Traceback (most recent call last):
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 193, in wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/utilities/asyncutils.py", line 140, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/cli/deployment.py", line 449, in build
    template = await Block.load(storage_block)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/blocks/core.py", line 514, in load
    return cls._from_block_document(block_document)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/blocks/core.py", line 401, in _from_block_document
    else cls.get_block_class_from_schema(block_document.block_schema)
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/blocks/core.py", line 432, in get_block_class_from_schema
    return lookup_type(cls, block_schema_to_key(schema))
  File "/Users/riomcmahon/opt/anaconda3/envs/goodkiwi_env/lib/python3.7/site-packages/prefect/utilities/dispatch.py", line 187, in lookup_type
    f"No class found for dispatch key {dispatch_key!r} in registry for type "
KeyError: "No class found for dispatch key 'dostorage' in registry for type 'Block'."
Solution - ended up needing to define a prefect collection and installing into my python environment.
🙌 1
a
nice work figuring that out! one user has recently contributed this https://github.com/PrefectHQ/prefect/pull/6344 in case you would also like to contribute your Digital Ocean Block
it's a pretty straightforward interface and given you already figured it out, might be useful to submit a PR to the main package - up to you :)
r
@Anna Geller - I am struggling a little bit to actually implement it because DigitalOcean uses an
https://
prefix instead of
s3://
(although it is S3 compatible). Based on this I think you should be able to pass in the
endpoint_url
client_kwargs to fix this; however in the RemoteFileSystem filesystem definition the scheme is automatically inferred from the URL provided and it returns an
fsspec.implementations.http.HTTPFileSystem
implementation instead of
s3fs.core.S3FileSystem
implementation like I need. Any thoughts on fixing this? I’m trying to manually edit the prefix to
s3://
to “trick” the
RemoteFileSystem.filesystem
property but this is pretty hacky.
The main difference between the PR linked above and this is that
smb
is an accepted schema (see here: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/registry.py#L179-L181) but since DO uses
https
it gets wonky
The relevant (seems to be working) code is:
Copy code
class DOStorage(ReadableFileSystem, WritableFileSystem, Block):
    """
    Store data as a file on Digital Ocean Spaces.
    """
    _block_type_name = "DOStorage"
    _logo_url = '<https://cdn.iconscout.com/icon/free/png-256/digital-ocean-3521385-2944829.png>'

    bucket_path: str = Field(
        ..., description="A DigitalOcean bucket path (without `https://` prefix)", example="my-bucket/a-directory-within"
    )
    region_name: str = Field(..., description="Digital ocean region name", example='sfo3')
    access_key: SecretStr = Field(None, title="DO Access Key ID")
    access_key_secret: SecretStr = Field(None, title="DO Secret Access Key")

    _remote_file_system: RemoteFileSystem = None

    @property
    def basepath(self) -> str:
        return f's3://{self.bucket_path}'  # this will get fsspec to properly infer s3 object storage

    @property
    def filesystem(self) -> RemoteFileSystem:
        settings = {}

        # unpack secrets
        if self.access_key:
            settings["key"] = self.access_key.get_secret_value()
        if self.access_key_secret:
            settings["secret"] = self.access_key_secret.get_secret_value()
        
        # unpack endpoint url to solve compatibility issues
        settings['client_kwargs'] = {
            'endpoint_url': f'https://{self.bucket_path}',
            'region_name': self.region_name
        }

        # build the remote filesystem object without endpoint_url
        self._remote_file_system = RemoteFileSystem(
            basepath=self.basepath, settings=settings
        )

        return self._remote_file_system
wherein the
basepath
method I just manually prepend
s3://
. A less hacky solution I think would be to modify the
RemoteFileSystem
class to accept a kwarg that allows users to manually specifiy the fsspec scheme. Thoughts on that?
a
in that case, perhaps it makes sense to make a PR to fsspec first to fix that?
it's the first remote FS I see that doesn't follow this s3://, gcs:// az:// convention
I'd prefer not to modify RemoteFileSystem block, so contributing without making changes to that would be easier
👍 1
r
Sounds good. It is pretty annoying that DO doesn’t conform to that naming convention but so it goes 🤷‍♀️; either way the above code snippet works as a modification for anyone running into this in the future
🙌 1
I’m also not actually sure that this should be an fsspec change though -
RemoteFileSystem
is the one (shown in the following snippet) that is the one that is “incorrectly” making the inference - heavy finger quotations around incorrectly because ultimately it is DO’s issue in using https not s3 as the scheme.
Copy code
scheme, _, _, _, _ = urllib.parse.urlsplit(self.basepath)
...
self._filesystem = fsspec.filesystem(scheme, **self.settings)
If
fsspec.filesystem()
was called with the
s3
scheme then it wouldn’t be an issue; I don’t really think there is any way to make a PR to fsspec that would cover this behavior since they have users explicitly provide the scheme.