Sean Davis
07/17/2025, 7:56 PMMarvin
07/17/2025, 7:56 PMMarvin
07/17/2025, 7:58 PMpython
from prefect import task, flow
from prefect.filesystems import S3
@task(
persist_result=True,
result_storage="s3/my-bucket-block" # Reference your pre-configured S3 block
)
def my_task():
# Your task logic here
return "Some result to persist"
@flow
def my_flow():
result = my_task()
return result
Key points about this example:
1. The persist_result=True
parameter tells Prefect to persist the task's return value
2. The result_storage="s3/my-bucket-block"
parameter specifies which S3 block to use for storage. Replace "my-bucket-block" with your actual block name
3. The format is "s3/block-name"
where "block-name" is the name you gave your S3 block when configuring it
Before running this code, make sure you have already created an S3 block in your Prefect workspace. You can create one either through the UI or programmatically:
python
# Example of creating an S3 block programmatically
s3_block = S3(
bucket_path="my-bucket/prefect-results", # Specify your bucket and optional prefix
credentials={
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-key"
}
)
s3_block.save("my-bucket-block")
When the task runs, its results will be automatically persisted to the specified S3 bucket. The results are stored in a structured format that Prefect can later retrieve if needed.
Note that you can also configure persistence at the flow level, which will apply to all tasks within that flow unless explicitly overridden:
python
@flow(persist_result=True, result_storage="s3/my-bucket-block")
def my_flow():
result = my_task()
return result
Remember that the S3 block needs appropriate permissions to write to your S3 bucket, and your AWS credentials should have the necessary IAM permissions configured.Sean Davis
07/17/2025, 8:04 PM@task(result_storage="s3-bucket/osn-bucket", persist_result=True, task_run_name="abc-testing")
def abc_testing() -> str:
logger.info("This is a test task for AWS results storage")
return "Hello from abc_testing!"
When I run this code in a flow, I get the following error:
16:01:33.731 | ERROR | Task run 'abc-testing' - An error was encountered while committing transaction '0613ec5710a2f902c0a6231d374db506'
Traceback (most recent call last):
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/transactions.py", line 353, in commit
self.store.persist_result_record(result_record=self._staged_value)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/results.py", line 823, in persist_result_record
return self._persist_result_record(
~~~~~~~~~~~~~~~~~~~~~~~~~~~^
result_record=result_record, holder=holder, _sync=True
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 347, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
return call.result()
~~~~~~~~~~~^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 329, in result
return self.future.result(timeout=timeout)
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
return self.__get_result()
~~~~~~~~~~~~~~~~~^^
File "/Users/davsean/.pyenv/versions/3.13.3/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 405, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 341, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/results.py", line 803, in _persist_result_record
await call_explicitly_async_block_method(
...<4 lines>...
)
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/compatibility/blocks.py", line 19, in call_explicitly_async_block_method
return await getattr(block, f"a{method}")(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect_aws/s3.py", line 1255, in awrite_path
await run_sync_in_worker_thread(self._write_sync, path, content)
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
result = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
call_with_mark, call, abandon_on_cancel=True, limiter=get_thread_limiter()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
)
^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
return call()
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect_aws/s3.py", line 1219, in _write_sync
s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key)
~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/boto3/s3/inject.py", line 642, in upload_fileobj
return future.result()
~~~~~~~~~~~~~^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/futures.py", line 103, in result
return self._coordinator.result()
~~~~~~~~~~~~~~~~~~~~~~~~^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/futures.py", line 264, in result
raise self._exception
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/tasks.py", line 135, in __call__
return self._execute_main(kwargs)
~~~~~~~~~~~~~~~~~~^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/tasks.py", line 158, in _execute_main
return_value = self._main(**kwargs)
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/upload.py", line 796, in _main
client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/botocore/client.py", line 569, in _api_call
return self._make_api_call(operation_name, kwargs)
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/botocore/client.py", line 1023, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (MissingContentLength) when calling the PutObject operation: Unknown
Are you able to help diagnose the problem?Marvin
07/17/2025, 8:05 PMMarvin
07/17/2025, 8:05 PMMissingContentLength
) is related to how the S3 client is trying to upload the result to your S3 bucket. Let me point out a few potential issues and solutions:
1. First, I notice you're using the format "s3-bucket/osn-bucket"
in your result_storage parameter. This should actually be "s3/block-name"
. The format should be:
- "s3"
(the block type)
- followed by /
- followed by the name you gave to your block when you created it
2. Make sure you have properly created and configured your S3 block. Here's how you can create one:
python
from prefect_aws import S3Bucket
s3_block = S3Bucket(
bucket_name="osn-bucket", # Your actual bucket name
credentials={
"aws_access_key_id": "your-access-key",
"aws_secret_access_key": "your-secret-key"
}
)
s3_block.save("my-block-name") # This is the name you'll reference in result_storage
Then modify your task to use the correct reference:
python
@task(
result_storage="s3/my-block-name", # Note the format: "s3/block-name"
persist_result=True,
task_run_name="abc-testing"
)
def abc_testing() -> str:
<http://logger.info|logger.info>("This is a test task for AWS results storage")
return "Hello from abc_testing!"
3. If you've already created the block through the UI, you can verify its existence and configuration by running:
python
from prefect_aws import S3Bucket
# This will raise an error if the block doesn't exist
block = S3Bucket.load("your-block-name")
4. Make sure your AWS credentials have the necessary permissions to write to the S3 bucket.
The MissingContentLength
error typically occurs when the S3 client can't determine the size of the content being uploaded. This might be happening because of the incorrect block configuration or reference.
Could you confirm:
1. How did you create your S3 block (UI or code)?
2. What's the actual name of your block as configured in Prefect?
This will help me provide more specific guidance to resolve the issue.Sean Davis
07/17/2025, 8:06 PMprefect blocks ls
Blocks
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ID ┃ Type ┃ Name ┃ Slug ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ac62712b-962c-453b-b5a9-f748c50fbccc │ AWS Credentials │ osn-bucket │ aws-credentials/osn-bucket │
│ 51dd9b76-8c60-46e9-9bae-796cfc6db4db │ JSON │ sra-etl-state │ json/sra-etl-state │
│ 740ba99d-79e8-42fa-973b-113cf2964414 │ S3 Bucket │ osn-bucket │ s3-bucket/osn-bucket │
└──────────────────────────────────────┴─────────────────┴───────────────┴────────────────────────────┘
The block was configured with the UI.Marvin
07/17/2025, 8:06 PMMarvin
07/17/2025, 8:07 PMpython
@task(
result_storage="s3/osn-bucket", # Changed from "s3-bucket/osn-bucket" to "s3/osn-bucket"
persist_result=True,
task_run_name="abc-testing"
)
def abc_testing() -> str:
<http://logger.info|logger.info>("This is a test task for AWS results storage")
return "Hello from abc_testing!"
The key change is using "s3/osn-bucket"
instead of "s3-bucket/osn-bucket"
. This is because:
1. The block type for result storage is always "s3" (not "s3-bucket")
2. "osn-bucket" is the name of your block
You can verify the block is properly configured by trying to load it:
python
from prefect_aws import S3Bucket
# This should work if the block is properly configured
block = S3Bucket.load("osn-bucket")
If you're still getting the MissingContentLength error after making this change, we might need to check the block's configuration. You can view the block's configuration in the UI or by printing its attributes after loading it (just be careful not to print any sensitive credentials).Sean Davis
07/17/2025, 8:12 PMMarvin
07/17/2025, 8:12 PMMarvin
07/17/2025, 8:13 PMpython
from prefect_aws import S3Bucket
block = S3Bucket.load("osn-bucket")
# Print all public attributes (excluding internal ones that start with _)
for attr in dir(block):
if not attr.startswith('_'): # Skip internal attributes
try:
value = getattr(block, attr)
# Only print if it's not a method and not credentials
if not callable(value) and attr != 'credentials':
print(f"{attr}: {value}")
except Exception:
pass
This will help us verify:
1. The bucket name is correct
2. Any specific path within the bucket that's configured
3. The AWS region being used
4. Other configuration settings
Given that you mentioned the bucket loads correctly but you're still getting the MissingContentLength error, it would be helpful to see these configuration details. This error typically occurs when there's an issue with the S3 client's ability to determine the size of the content being uploaded, which could be related to how the bucket or credentials are configured.
Also, could you share what version of the prefect-aws
package you're using? You can check with:
python
import prefect_aws
print(prefect_aws.__version__)
This might help us identify if this is a version-specific issue.Sean Davis
07/17/2025, 8:17 PMS3Bucket(bucket_name='cis240955-1', credentials=AwsCredentials(aws_access_key_id='1V4U7AX6NP8N6DQKNW14', aws_secret_access_key=SecretStr('**********'), aws_session_token=None, profile_name=None, region_name=None, aws_client_parameters=AwsClientParameters(api_version=None, use_ssl=True, verify=None, verify_cert_path=None, endpoint_url='<https://nyu1.osn.mghpcc.org>', config=None)), bucket_folder='')
Marvin
07/17/2025, 8:17 PMMarvin
07/17/2025, 8:17 PMpython
from prefect_aws import S3Bucket, AwsCredentials
from prefect_aws.aws_client_parameters import AwsClientParameters
# Create credentials object
credentials = AwsCredentials(
aws_access_key_id="1V4U7AX6NP8N6DQKNW14",
aws_secret_access_key="your-secret-key",
aws_client_parameters=AwsClientParameters(
endpoint_url="<https://nyu1.osn.mghpcc.org>",
# Add these parameters to help with the content length issue
config={
"s3": {
"addressing_style": "path",
"signature_version": "s3v4"
}
}
)
)
# Create S3 bucket block
s3_block = S3Bucket(
bucket_name="cis240955-1",
credentials=credentials,
bucket_folder="" # Your current setting
)
# Save the block with the same name
s3_block.save("osn-bucket", overwrite=True)
Then in your task, make sure you're using:
python
@task(
result_storage="s3/osn-bucket",
persist_result=True,
task_run_name="abc-testing"
)
def abc_testing() -> str:
<http://logger.info|logger.info>("This is a test task for AWS results storage")
return "Hello from abc_testing!"
The key changes in this configuration:
1. Added explicit S3 client configuration parameters
2. Set the addressing style to "path"
3. Specified "s3v4" signature version which is more compatible with S3-compatible storage services
Also, could you check the version of prefect-aws
you're using? Some older versions had issues with S3-compatible storage services. You can upgrade to the latest version with:
bash
pip install --upgrade prefect-aws
If you're still getting the error after these changes, we might need to try a few other S3 client configurations that are specific to OSN storage. Could you also share:
1. The current version of prefect-aws
you're using
2. Whether you're able to perform basic S3 operations (like uploading a file) outside of Prefect using the same credentials