https://prefect.io logo
Title
m

Mike He

09/16/2022, 9:46 AM
I have tried to patch the
prefect.filesystem.RemoteFileSystem
with the following code the other day. But after some digging into the
fsspec
source code I am not sure the problem should be classified as directly related with
Prefect
or the
fsspec
. Also the patching is a little bit ugly and I don't want to patch it that way for every agent environments XD. So I am asking here, if that is the case for me only with Windows platform? And if the error is universal, will the Prefect Team fix it in the future or waiting
fsspec
to fix that? BTW: I have also raised an Issue to
fsspec
here Summary: Added 2 lines to
RemoteFileSystem.get_directory
, as well as a new function
RemoteFileSystem.get_directory_ftp
# prefect/filesystem.py
...

class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage):

    ...

    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        """
        Downloads a directory from a given remote path to a local direcotry.

        Defaults to downloading the entire contents of the block's basepath to the current working directory.
        """
        if from_path is None:
            from_path = str(self.basepath)
        else:
            from_path = self._resolve_path(from_path)

        if local_path is None:
            local_path = Path(".").absolute()

        if urllib.parse.urlsplit(self.basepath).scheme == 'ftp':  # Add
            return await self.get_directory_ftp(from_path, local_path)  # Add

        return self.filesystem.get(from_path, local_path, recursive=True)

    async def get_directory_ftp(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        from_path_raw = urllib.parse.urlsplit(from_path).path
        for file_directory_item in <http://self.filesystem.ls|self.filesystem.ls>(from_path):
            type_ = file_directory_item["type"]
            name = file_directory_item["name"]
            other_path = name[len(from_path_raw) :]
            if other_path.startswith("/"):  # Change to relative path
                other_path = other_path[1:]
            dest_path = Path(local_path).joinpath(other_path)
            if type_ == "directory":
                if (not dest_path.exists()) or dest_path.is_file():
                    dest_path.mkdir()
                await self.get_directory_ftp(name, dest_path)
            if type_ == "file":
                try:
                    self.filesystem.get_file(name, dest_path)
                except:
                    print(f"FTP Error downloading {name} to {dest_path}")