Hello all, I would like to ask if you could help ...
# prefect-community
j
Hello all, I would like to ask if you could help me with a problem I'm having in creating a deployment with an SFTP server as remote storage? I'm using Prefect server version 2.6.4 on Windows 10. Specifically, I locally created an example project with the following structure:
Copy code
C:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
├───.idea
│       some files
│           
├───prefect_flows
│       prefect_flows.py
│       
└───prefect_tasks
        prefect_tasks.py
In addition, I created a remote storage block via the UI named
sftp-server
and the base path
<sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
. After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder
prefect_flows
with the file
prefect_flows.py
is uploaded to the
api_flow
directory of the SFTP server, not the folder
prefect_tasks
. Also, Prefect gives the following error:
Copy code
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
  self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!
1
Traceback:
Copy code
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"                   
Found flow 'api-flow'
Default '.prefectignore' file written to C:\Users\Testbenutzer\prefect_test_project\.prefectignore
Traceback (most recent call last):
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
    return fn(*args, **kwargs)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 201, in coroutine_wrapper
    return run_async_in_new_loop(async_fn, *args, **kwargs)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 152, in run_async_in_new_loop
    return anyio.run(partial(__fn, *args, **kwargs))
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\base_events.py", line 646, in run_until_complete
    return future.result()
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\deployment.py", line 853, in build
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 720, in build_from_flow
    await deployment.upload_to_storage()
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 572, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\filesystems.py", line 359, in put_directory
    self.filesystem.put_file(f, fpath, overwrite=True)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\spec.py", line 819, in put_file
    callback.relative_update(segment_len)
  File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
    self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
Python code:
Copy code
# prefect_flows.py

from prefect import flow
from prefect_tasks.prefect_tasks import call_api


@flow
def api_flow(url):
    fact_json = call_api(url)
    print(fact_json)
	
	

# prefect_tasks.py

import requests
from prefect import task


@task
def call_api(url):
    response = requests.get(url)
    print(response.status_code)
    return response.json()
Storage block definition:
q
TLDR:
pip install fsspec>=2022.10.0
As you can see in the traceback whenever a file gets written to the remote the progress is tracked via a callback that adds the value returned by
AbstractFileSystem.open().write
to a counter. It is assumed that the value represents the number of bytes written to the buffer. However, in the case of
fsspec.implementations.sftp.SFTPFileSystem
the buffer (
paramiko.file.BufferedFile
) returns
None
when
write
is called, which leads to an exception when
fsspec.spec.AbstractBufferedFile
attempts to add
None
to an int counter. The offending segment changed in
fsspec==2022.10.0
(released 2022/10/19), bumping `fsspec`'s version seems to solve the problem.
🙌 1
🙏 1
j
Thank you so much @Q, that solved the problem! And also thank you very much for the explanation!