<@ULVA73B9P> can you give me an example of a task ...
# ask-marvin
s
@Marvin can you give me an example of a task with persistence to a pre-configured s3 block?
m
thought for 143.2 seconds
Let me provide you with an example of how to create a task with S3 persistence. First, you'll need to have an S3 block configured in your Prefect workspace. Here's a complete example:
Copy code
python
from prefect import task, flow
from prefect.filesystems import S3

@task(
    persist_result=True,
    result_storage="s3/my-bucket-block"  # Reference your pre-configured S3 block
)
def my_task():
    # Your task logic here
    return "Some result to persist"

@flow
def my_flow():
    result = my_task()
    return result
Key points about this example: 1. The
persist_result=True
parameter tells Prefect to persist the task's return value 2. The
result_storage="s3/my-bucket-block"
parameter specifies which S3 block to use for storage. Replace "my-bucket-block" with your actual block name 3. The format is
"s3/block-name"
where "block-name" is the name you gave your S3 block when configuring it Before running this code, make sure you have already created an S3 block in your Prefect workspace. You can create one either through the UI or programmatically:
Copy code
python
# Example of creating an S3 block programmatically
s3_block = S3(
    bucket_path="my-bucket/prefect-results",  # Specify your bucket and optional prefix
    credentials={
        "aws_access_key_id": "your-access-key",
        "aws_secret_access_key": "your-secret-key"
    }
)
s3_block.save("my-bucket-block")
When the task runs, its results will be automatically persisted to the specified S3 bucket. The results are stored in a structured format that Prefect can later retrieve if needed. Note that you can also configure persistence at the flow level, which will apply to all tasks within that flow unless explicitly overridden:
Copy code
python
@flow(persist_result=True, result_storage="s3/my-bucket-block")
def my_flow():
    result = my_task()
    return result
Remember that the S3 block needs appropriate permissions to write to your S3 bucket, and your AWS credentials should have the necessary IAM permissions configured.
s
@Marvin I have a task that looks like:
Copy code
@task(result_storage="s3-bucket/osn-bucket", persist_result=True, task_run_name="abc-testing")
def abc_testing() -> str:
    logger.info("This is a test task for AWS results storage")
    return "Hello from abc_testing!"
When I run this code in a flow, I get the following error:
Copy code
16:01:33.731 | ERROR   | Task run 'abc-testing' - An error was encountered while committing transaction '0613ec5710a2f902c0a6231d374db506'
Traceback (most recent call last):
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/transactions.py", line 353, in commit
    self.store.persist_result_record(result_record=self._staged_value)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/results.py", line 823, in persist_result_record
    return self._persist_result_record(
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~^
        result_record=result_record, holder=holder, _sync=True
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 347, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
    return call.result()
           ~~~~~~~~~~~^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 329, in result
    return self.future.result(timeout=timeout)
           ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
    return self.__get_result()
           ~~~~~~~~~~~~~~~~~^^
  File "/Users/davsean/.pyenv/versions/3.13.3/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/concurrency/calls.py", line 405, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 341, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/results.py", line 803, in _persist_result_record
    await call_explicitly_async_block_method(
    ...<4 lines>...
    )
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/_internal/compatibility/blocks.py", line 19, in call_explicitly_async_block_method
    return await getattr(block, f"a{method}")(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect_aws/s3.py", line 1255, in awrite_path
    await run_sync_in_worker_thread(self._write_sync, path, content)
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        call_with_mark, call, abandon_on_cancel=True, limiter=get_thread_limiter()
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
    return call()
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/prefect_aws/s3.py", line 1219, in _write_sync
    s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/boto3/s3/inject.py", line 642, in upload_fileobj
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/futures.py", line 103, in result
    return self._coordinator.result()
           ~~~~~~~~~~~~~~~~~~~~~~~~^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/futures.py", line 264, in result
    raise self._exception
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/tasks.py", line 135, in __call__
    return self._execute_main(kwargs)
           ~~~~~~~~~~~~~~~~~~^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/tasks.py", line 158, in _execute_main
    return_value = self._main(**kwargs)
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/s3transfer/upload.py", line 796, in _main
    client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
    ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/botocore/client.py", line 569, in _api_call
    return self._make_api_call(operation_name, kwargs)
           ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/davsean/Documents/git/omicidx-gh-etl/.venv/lib/python3.13/site-packages/botocore/client.py", line 1023, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (MissingContentLength) when calling the PutObject operation: Unknown
Are you able to help diagnose the problem?
m
thought for 22.3 seconds
The error you're seeing (
MissingContentLength
) is related to how the S3 client is trying to upload the result to your S3 bucket. Let me point out a few potential issues and solutions: 1. First, I notice you're using the format
"s3-bucket/osn-bucket"
in your result_storage parameter. This should actually be
"s3/block-name"
. The format should be: -
"s3"
(the block type) - followed by
/
- followed by the name you gave to your block when you created it 2. Make sure you have properly created and configured your S3 block. Here's how you can create one:
Copy code
python
from prefect_aws import S3Bucket

s3_block = S3Bucket(
    bucket_name="osn-bucket",  # Your actual bucket name
    credentials={
        "aws_access_key_id": "your-access-key",
        "aws_secret_access_key": "your-secret-key"
    }
)
s3_block.save("my-block-name")  # This is the name you'll reference in result_storage
Then modify your task to use the correct reference:
Copy code
python
@task(
    result_storage="s3/my-block-name",  # Note the format: "s3/block-name"
    persist_result=True,
    task_run_name="abc-testing"
)
def abc_testing() -> str:
    <http://logger.info|logger.info>("This is a test task for AWS results storage")
    return "Hello from abc_testing!"
3. If you've already created the block through the UI, you can verify its existence and configuration by running:
Copy code
python
from prefect_aws import S3Bucket

# This will raise an error if the block doesn't exist
block = S3Bucket.load("your-block-name")
4. Make sure your AWS credentials have the necessary permissions to write to the S3 bucket. The
MissingContentLength
error typically occurs when the S3 client can't determine the size of the content being uploaded. This might be happening because of the incorrect block configuration or reference. Could you confirm: 1. How did you create your S3 block (UI or code)? 2. What's the actual name of your block as configured in Prefect? This will help me provide more specific guidance to resolve the issue.
s
@Marvin here is the output of prefect blocks ls:
Copy code
prefect blocks  ls
                                                Blocks                                                 
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ID                                   ┃ Type            ┃ Name          ┃ Slug                       ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ac62712b-962c-453b-b5a9-f748c50fbccc │ AWS Credentials │ osn-bucket    │ aws-credentials/osn-bucket │
│ 51dd9b76-8c60-46e9-9bae-796cfc6db4db │ JSON            │ sra-etl-state │ json/sra-etl-state         │
│ 740ba99d-79e8-42fa-973b-113cf2964414 │ S3 Bucket       │ osn-bucket    │ s3-bucket/osn-bucket       │
└──────────────────────────────────────┴─────────────────┴───────────────┴────────────────────────────┘
The block was configured with the UI.
m
thought for 7.6 seconds
Ah, now I see the issue! Looking at your block listing, I can see that your S3 block is named "osn-bucket" and has a slug of "s3-bucket/osn-bucket". The issue is in how you're referencing the block in your task decorator. When using result storage with S3, you need to use the block type "s3" (not "s3-bucket") followed by the block name. Here's how to fix your task:
Copy code
python
@task(
    result_storage="s3/osn-bucket",  # Changed from "s3-bucket/osn-bucket" to "s3/osn-bucket"
    persist_result=True,
    task_run_name="abc-testing"
)
def abc_testing() -> str:
    <http://logger.info|logger.info>("This is a test task for AWS results storage")
    return "Hello from abc_testing!"
The key change is using
"s3/osn-bucket"
instead of
"s3-bucket/osn-bucket"
. This is because: 1. The block type for result storage is always "s3" (not "s3-bucket") 2. "osn-bucket" is the name of your block You can verify the block is properly configured by trying to load it:
Copy code
python
from prefect_aws import S3Bucket

# This should work if the block is properly configured
block = S3Bucket.load("osn-bucket")
If you're still getting the MissingContentLength error after making this change, we might need to check the block's configuration. You can view the block's configuration in the UI or by printing its attributes after loading it (just be careful not to print any sensitive credentials).
s
@Marvin How do I print the attributes? The bucket seems to load correctly.
m
thought for 25.1 seconds
You can also use Python's built-in dir() function to see all available attributes (though some might be internal):
Copy code
python
from prefect_aws import S3Bucket

block = S3Bucket.load("osn-bucket")

# Print all public attributes (excluding internal ones that start with _)
for attr in dir(block):
    if not attr.startswith('_'):  # Skip internal attributes
        try:
            value = getattr(block, attr)
            # Only print if it's not a method and not credentials
            if not callable(value) and attr != 'credentials':
                print(f"{attr}: {value}")
        except Exception:
            pass
This will help us verify: 1. The bucket name is correct 2. Any specific path within the bucket that's configured 3. The AWS region being used 4. Other configuration settings Given that you mentioned the bucket loads correctly but you're still getting the MissingContentLength error, it would be helpful to see these configuration details. This error typically occurs when there's an issue with the S3 client's ability to determine the size of the content being uploaded, which could be related to how the bucket or credentials are configured. Also, could you share what version of the
prefect-aws
package you're using? You can check with:
Copy code
python
import prefect_aws
print(prefect_aws.__version__)
This might help us identify if this is a version-specific issue.
s
@Marvin here is the model dump for the s3 bucket:
Copy code
S3Bucket(bucket_name='cis240955-1', credentials=AwsCredentials(aws_access_key_id='1V4U7AX6NP8N6DQKNW14', aws_secret_access_key=SecretStr('**********'), aws_session_token=None, profile_name=None, region_name=None, aws_client_parameters=AwsClientParameters(api_version=None, use_ssl=True, verify=None, verify_cert_path=None, endpoint_url='<https://nyu1.osn.mghpcc.org>', config=None)), bucket_folder='')
m
thought for 12.6 seconds
Ah, now I see something important in your configuration. You're using an S3-compatible storage service (OSN - Open Storage Network) rather than AWS S3 directly, as indicated by the endpoint_url pointing to 'https://nyu1.osn.mghpcc.org'. The MissingContentLength error might be related to how the S3 client interacts with this custom endpoint. Let's try modifying your block configuration to handle this better. You can update the block either through the UI or programmatically:
Copy code
python
from prefect_aws import S3Bucket, AwsCredentials
from prefect_aws.aws_client_parameters import AwsClientParameters

# Create credentials object
credentials = AwsCredentials(
    aws_access_key_id="1V4U7AX6NP8N6DQKNW14",
    aws_secret_access_key="your-secret-key",
    aws_client_parameters=AwsClientParameters(
        endpoint_url="<https://nyu1.osn.mghpcc.org>",
        # Add these parameters to help with the content length issue
        config={
            "s3": {
                "addressing_style": "path",
                "signature_version": "s3v4"
            }
        }
    )
)

# Create S3 bucket block
s3_block = S3Bucket(
    bucket_name="cis240955-1",
    credentials=credentials,
    bucket_folder=""  # Your current setting
)

# Save the block with the same name
s3_block.save("osn-bucket", overwrite=True)
Then in your task, make sure you're using:
Copy code
python
@task(
    result_storage="s3/osn-bucket",
    persist_result=True,
    task_run_name="abc-testing"
)
def abc_testing() -> str:
    <http://logger.info|logger.info>("This is a test task for AWS results storage")
    return "Hello from abc_testing!"
The key changes in this configuration: 1. Added explicit S3 client configuration parameters 2. Set the addressing style to "path" 3. Specified "s3v4" signature version which is more compatible with S3-compatible storage services Also, could you check the version of
prefect-aws
you're using? Some older versions had issues with S3-compatible storage services. You can upgrade to the latest version with:
Copy code
bash
pip install --upgrade prefect-aws
If you're still getting the error after these changes, we might need to try a few other S3 client configurations that are specific to OSN storage. Could you also share: 1. The current version of
prefect-aws
you're using 2. Whether you're able to perform basic S3 operations (like uploading a file) outside of Prefect using the same credentials