Jens Freund
10/23/2022, 8:01 PMC:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
├───.idea
│ some files
│
├───prefect_flows
│ prefect_flows.py
│
└───prefect_tasks
prefect_tasks.py
In addition, I created a remote storage block via the UI named sftp-server and the base path <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow> .
After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder prefect_flows with the file prefect_flows.py is uploaded to the api_flow directory of the SFTP server, not the folder prefect_tasks. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!Jens Freund
10/23/2022, 8:05 PMPS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
Found flow 'api-flow'
Default '.prefectignore' file written to C:\Users\Testbenutzer\prefect_test_project\.prefectignore
Traceback (most recent call last):
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\base_events.py", line 646, in run_until_complete
return future.result()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
return await func(*args)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\deployment.py", line 853, in build
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 720, in build_from_flow
await deployment.upload_to_storage()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\filesystems.py", line 359, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\spec.py", line 819, in put_file
callback.relative_update(segment_len)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
Python code:
# prefect_flows.py
from prefect import flow
from prefect_tasks.prefect_tasks import call_api
@flow
def api_flow(url):
fact_json = call_api(url)
print(fact_json)
# prefect_tasks.py
import requests
from prefect import task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
Storage block definition:Q
10/24/2022, 11:27 AMpip install fsspec>=2022.10.0
As you can see in the traceback whenever a file gets written to the remote the progress is tracked via a callback that adds the value returned by AbstractFileSystem.open().write to a counter.
It is assumed that the value represents the number of bytes written to the buffer.
However, in the case of fsspec.implementations.sftp.SFTPFileSystem the buffer (paramiko.file.BufferedFile) returns None when write is called, which leads to an exception when fsspec.spec.AbstractBufferedFile attempts to add None to an int counter.
The offending segment changed in fsspec==2022.10.0 (released 2022/10/19), bumping `fsspec`'s version seems to solve the problem.Jens Freund
10/24/2022, 11:57 AM