sbrabez
06/26/2023, 8:50 AMprefect.filesystems
that use fsspec to point the configuration of my S3 bucket via the keyword argument settings=
like below:
from prefect import task, flow
from prefect.context import get_run_context
from prefect.filesystems import RemoteFileSystem
bucket_endpoint = "..."
access_key = "..."
secret_key = "..."
bucket_name = "s3-prefect-results-bucket"
s3 = RemoteFileSystem(basepath=f"s3://{bucket_name}",
settings={"client_kwargs": {"endpoint_url": bucket_endpoint}, "key": access_key, "secret": secret_key})
@task(persist_result=True)
def task_persisted(name):
ctx = get_run_context()
task_id = ctx.task_run.id
ret = f"Task id persisted {task_id} by {name}"
return ret
@flow(name="persist_flow", result_storage=s3)
def persist_flow(server_id: int):
result = task_persisted("sbrabez")
print(f"Result persisted: {result}")
return 0
But when I executed it, it reports the following errors. i.e. It seems it cannot PubObject
(S3 API) and the problem seems located in the aiobotocore
client side. Any ideas what’s wrong here or what I’m doing wrong? My S3 Credentials are OK here.
OSError: [Errno 22] Invalid Argument.
| flow_name:persist_flow | flow_run_name:important-dragonfly | flow_run_id:211feee3-4ab1-4b81-86ea-8e76856ed500 | level_name:ERROR | message:Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):
File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
return await func(*args, **kwargs)
File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.
The above exception was the direct cause of the following exception:
OSError: [Errno 22] Invalid Argument.\n')
Traceback (most recent call last):
File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
return await func(*args, **kwargs)
File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "persist.py", line 32, in <module>
persist_flow(server_id=19959)
File "/opt/prefect/lib/python3.8/site-packages/prefect/flows.py", line 468, in __call__
return enter_flow_run_engine_from_flow_call(
File "/opt/prefect/lib/python3.8/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
FYI, I’m using the Prefect version 2.9.0
Version: 2.9.0
API version: 0.8.4
Python version: 3.8.10
Git commit: 69f57bd5
Built: Thu, Mar 30, 2023 1:08 PM
OS/Arch: linux/x86_64
Profile: objectStorage
Server type: server
I’ll appreciate if you can point me any example or resource to configure this persistent result feature via S3. Thanks for your assistance, Cheers! 🤝sbrabez
07/04/2023, 3:21 PM