Jens Freund
10/23/2022, 8:01 PMC:\USERS\TESTBENUTZER\PREFECT_TEST_PROJECT
├───.idea
│ some files
│
├───prefect_flows
│ prefect_flows.py
│
└───prefect_tasks
prefect_tasks.py
In addition, I created a remote storage block via the UI named sftp-server
and the base path <sftp://ACTUAL-SFTP-URL/prefect_flows/api_flow>
.
After that, I used the following command to create a deployment:
PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
However, only the folder prefect_flows
with the file prefect_flows.py
is uploaded to the api_flow
directory of the SFTP server, not the folder prefect_tasks
. Also, Prefect gives the following error:
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
I would write the full traceback into the thread, as well as the code of the two Python files and a screenshot of the storage block definition (without the actual SFTP URL and login credentials). If anyone had any idea, what I could be doing wrong here, that would be a great help! Many thanks in advance!PS C:\Users\Testbenutzer\prefect_test_project> prefect deployment build "C:\Users\Testbenutzer\prefect_test_project\prefect_flows\prefect_flows.py:api_flow" -n "ftp-test" -q "ftp-test" -sb "remote-file-system/sftp-server"
Found flow 'api-flow'
Default '.prefectignore' file written to C:\Users\Testbenutzer\prefect_test_project\.prefectignore
Traceback (most recent call last):
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\utilities\asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\asyncio\base_events.py", line 646, in run_until_complete
return future.result()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
return await func(*args)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\cli\deployment.py", line 853, in build
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 720, in build_from_flow
await deployment.upload_to_storage()
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\deployments.py", line 572, in upload_to_storage
file_count = await self.storage.put_directory(
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\prefect\filesystems.py", line 359, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\spec.py", line 819, in put_file
callback.relative_update(segment_len)
File "C:\Users\Testbenutzer\anaconda3\envs\prefect_test_project_v2\lib\site-packages\fsspec\callbacks.py", line 65, in relative_update
self.value += inc
TypeError: unsupported operand type(s) for +=: 'int' and 'NoneType'
An exception occurred.
Python code:
# prefect_flows.py
from prefect import flow
from prefect_tasks.prefect_tasks import call_api
@flow
def api_flow(url):
fact_json = call_api(url)
print(fact_json)
# prefect_tasks.py
import requests
from prefect import task
@task
def call_api(url):
response = requests.get(url)
print(response.status_code)
return response.json()
Storage block definition:Q
10/24/2022, 11:27 AMpip install fsspec>=2022.10.0
As you can see in the traceback whenever a file gets written to the remote the progress is tracked via a callback that adds the value returned by AbstractFileSystem.open().write
to a counter.
It is assumed that the value represents the number of bytes written to the buffer.
However, in the case of fsspec.implementations.sftp.SFTPFileSystem
the buffer (paramiko.file.BufferedFile
) returns None
when write
is called, which leads to an exception when fsspec.spec.AbstractBufferedFile
attempts to add None
to an int counter.
The offending segment changed in fsspec==2022.10.0
(released 2022/10/19), bumping `fsspec`'s version seems to solve the problem.Jens Freund
10/24/2022, 11:57 AM