Hi everyone and Prefect community :wave:, I’m tryi...
# ask-community
s
Hi everyone and Prefect community 👋, I’m trying to configure Persistent Results of my flows and tasks to be stored on a S3 Bucket. I have configured my flow with the following. For the PoC, I’m using the RemoteFileSystem from
prefect.filesystems
that use fsspec to point the configuration of my S3 bucket via the keyword argument
settings=
like below:
Copy code
from prefect import task, flow
from prefect.context import get_run_context
from prefect.filesystems import RemoteFileSystem

bucket_endpoint = "..."
access_key = "..."
secret_key = "..."
bucket_name = "s3-prefect-results-bucket"
s3 = RemoteFileSystem(basepath=f"s3://{bucket_name}",
           settings={"client_kwargs": {"endpoint_url": bucket_endpoint}, "key": access_key, "secret": secret_key})

@task(persist_result=True)
def task_persisted(name):
  ctx = get_run_context()
  task_id = ctx.task_run.id
  ret = f"Task id persisted {task_id} by {name}"

  return ret

@flow(name="persist_flow", result_storage=s3)
def persist_flow(server_id: int):
  result = task_persisted("sbrabez")
  print(f"Result persisted: {result}")

  return 0
But when I executed it, it reports the following errors. i.e. It seems it cannot
PubObject
(S3 API) and the problem seems located in the
aiobotocore
client side. Any ideas what’s wrong here or what I’m doing wrong? My S3 Credentials are OK here.
Copy code
OSError: [Errno 22] Invalid Argument.
 | flow_name:persist_flow | flow_run_name:important-dragonfly | flow_run_id:211feee3-4ab1-4b81-86ea-8e76856ed500 | level_name:ERROR | message:Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):
File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
    return await func(*args, **kwargs)
File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.

The above exception was the direct cause of the following exception:

OSError: [Errno 22] Invalid Argument.\n')
Traceback (most recent call last):
  File "/opt/prefect/lib/python3.8/site-packages/s3fs/core.py", line 113, in _error_wrapper
    return await func(*args, **kwargs)
  File "/opt/prefect/lib/python3.8/site-packages/aiobotocore/client.py", line 371, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidArgument) when calling the PutObject operation: Invalid Argument.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "persist.py", line 32, in <module>
    persist_flow(server_id=19959)
  File "/opt/prefect/lib/python3.8/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/opt/prefect/lib/python3.8/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/opt/prefect/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
FYI, I’m using the Prefect version
2.9.0
Copy code
Version:             2.9.0
API version:         0.8.4
Python version:      3.8.10
Git commit:          69f57bd5
Built:               Thu, Mar 30, 2023 1:08 PM
OS/Arch:             linux/x86_64
Profile:             objectStorage
Server type:         server
I’ll appreciate if you can point me any example or resource to configure this persistent result feature via S3. Thanks for your assistance, Cheers! 🤝
About that topic, I had to do a custom development to make it work and be able to persist results remotely on S3 bucket. Currently, ootb it’s limited to S3 on AWS, but I wanted to make it work using S3 on premise or other providers as well