<@ULVA73B9P> I'm upgrading to Prefect 3! But I'm ...
# ask-marvin
m
@Marvin I'm upgrading to Prefect 3! But I'm debugging. I want to use
from prefect_aws.secrets_manager import read_secret
to read a secret from a stored block. Provide a code example for Prefect 3, note my approach returns a coroutine in the REPL.
m
It seems that the
prefect_aws.secrets_manager
module isn't directly available in the Prefect 3.x API. This might be why you're encountering issues when trying to use
read_secret
. Prefect 3.x has undergone significant changes, and some functionalities may have been moved or altered. Here's a general approach you can take to read secrets from AWS Secrets Manager using Prefect 3.x: 1. Install Necessary Packages: Ensure you have the
prefect-aws
package installed. 2. Use an Async Function: Prefect 3.x tasks often return coroutines, which need to be awaited. Here's a basic example of how you might structure your code:
Copy code
python
from prefect import flow, task
import boto3
import asyncio

@task
async def read_aws_secret(secret_name: str):
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager')

    # Use the client to retrieve the secret
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    return get_secret_value_response['SecretString']

@flow
async def my_flow():
    secret_name = "my_secret"
    secret_value = await read_aws_secret(secret_name)
    print(secret_value)

# Run the flow
if __name__ == "__main__":
    asyncio.run(my_flow())
This example uses the
boto3
library to interact with AWS Secrets Manager. The
read_aws_secret
task is asynchronous, which aligns with Prefect 3.x's async-first design. For more details on handling secrets and blocks in Prefect 3.x, you can refer to the Prefect documentation on secrets.
m
@Marvin Prefect AWS Its in the docs: https://prefect-python-sdk-docs.netlify.app/prefect_aws/secrets_manager/#prefect_aws.secrets_manager.read_secret I have a working example, but it is returning a coroutine.
Copy code
import json
from prefect import flow
from prefect.blocks.system import Secret
from prefect.states import Failed
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import read_secret
from src.blocks.classes.credential_pair import CredentialPair
from src.config import PREFECT_AWS_SERVICE_USER
from src.config import PREFECT_G_SERVICE_SECRETNAME

@flow
def get_aws_secret(secret_name, return_string=False):
    """
    Retrieves a secret string or binary from AWS Secrets Manager using a prefect_aws.AWSCredentials block that has IAM user credentials, whose user also has permissions to the secret.
    Returns dictionary by default, whole secret as string if return_string=True
    """
    secret_string = read_secret(
        secret_name=secret_name
        , aws_credentials=AwsCredentials.load(PREFECT_AWS_SERVICE_USER)
        )
    if return_string:
        return secret_string
    else:
        return json.loads(secret_string)
When I run this, I get the error:
Copy code
TypeError: the JSON object must be str, bytes or bytearray, not coroutine
m
It seems there was an issue accessing the
read_secret
function's signature from the Prefect AWS module. However, based on your description, it looks like
read_secret
is an asynchronous function, which would explain why it returns a coroutine. To fix the error you're encountering, you need to
await
the coroutine returned by
read_secret
. Here's how you can modify your flow to handle this:
Copy code
python
import json
from prefect import flow
from prefect.blocks.system import Secret
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import read_secret
from src.blocks.classes.credential_pair import CredentialPair
from src.config import PREFECT_AWS_SERVICE_USER
from src.config import PREFECT_G_SERVICE_SECRETNAME

@flow
async def get_aws_secret(secret_name, return_string=False):
    """
    Retrieves a secret string or binary from AWS Secrets Manager using a prefect_aws.AWSCredentials block that has IAM user credentials, whose user also has permissions to the secret.
    Returns dictionary by default, whole secret as string if return_string=True
    """
    secret_string = await read_secret(
        secret_name=secret_name,
        aws_credentials=AwsCredentials.load(PREFECT_AWS_SERVICE_USER)
    )
    if return_string:
        return secret_string
    else:
        return json.loads(secret_string)

# Run the flow
if __name__ == "__main__":
    import asyncio
    asyncio.run(get_aws_secret(PREFECT_G_SERVICE_SECRETNAME))
In this revised code, the
get_aws_secret
function is defined as an asynchronous function, and the
await
keyword is used to handle the coroutine returned by
read_secret
. This should resolve the
TypeError
you encountered. Make sure that any environment running this flow supports asynchronous execution.
m
@Marvin i prefer to wrap the read _secret to run in a sync context. how do i do that?
m
To wrap an asynchronous function like
read_secret
so that it can be used in a synchronous context, you can use Python's
asyncio
module. Here's how you can create a wrapper function to achieve this:
Copy code
python
import json
import asyncio
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import read_secret
from src.blocks.classes.credential_pair import CredentialPair
from src.config import PREFECT_AWS_SERVICE_USER
from src.config import PREFECT_G_SERVICE_SECRETNAME

def sync_read_secret(secret_name, aws_credentials):
    """
    Synchronously wrap the asynchronous read_secret function.
    """
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(read_secret(secret_name=secret_name, aws_credentials=aws_credentials))

@flow
def get_aws_secret(secret_name, return_string=False):
    """
    Retrieves a secret string or binary from AWS Secrets Manager using a prefect_aws.AWSCredentials block that has IAM user credentials, whose user also has permissions to the secret.
    Returns dictionary by default, whole secret as string if return_string=True
    """
    secret_string = sync_read_secret(
        secret_name=secret_name,
        aws_credentials=AwsCredentials.load(PREFECT_AWS_SERVICE_USER)
    )
    if return_string:
        return secret_string
    else:
        return json.loads(secret_string)

# Run the flow
if __name__ == "__main__":
    secret = get_aws_secret(PREFECT_G_SERVICE_SECRETNAME)
    print(secret)
### Explanation: -
sync_read_secret
Function
: This function creates an event loop and runs the asynchronous
read_secret
function within it. This allows you to call
read_secret
in a synchronous manner. - Event Loop Management:
asyncio.get_event_loop()
is used to get the current event loop, and
run_until_complete
runs the coroutine until it completes. This approach allows you to maintain a synchronous interface while still leveraging asynchronous operations under the hood.
To execute an asynchronous function in a synchronous context, you can use
asyncio.run
or
asyncio.get_event_loop().run_until_complete
. Here's how you can wrap
read_secret
to make it synchronous:
Copy code
python
import json
import asyncio
from prefect import flow
from prefect.blocks.system import Secret
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import read_secret
from src.blocks.classes.credential_pair import CredentialPair
from src.config import PREFECT_AWS_SERVICE_USER
from src.config import PREFECT_G_SERVICE_SECRETNAME

def sync_read_secret(secret_name, aws_credentials):
    """
    Synchronously wraps the asynchronous read_secret function.
    """
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(read_secret(
        secret_name=secret_name,
        aws_credentials=aws_credentials
    ))

@flow
def get_aws_secret(secret_name, return_string=False):
    """
    Retrieves a secret string or binary from AWS Secrets Manager using a prefect_aws.AWSCredentials block that has IAM user credentials, whose user also has permissions to the secret.
    Returns dictionary by default, whole secret as string if return_string=True
    """
    secret_string = sync_read_secret(
        secret_name=secret_name,
        aws_credentials=AwsCredentials.load(PREFECT_AWS_SERVICE_USER)
    )
    if return_string:
        return secret_string
    else:
        return json.loads(secret_string)

# Run the flow
if __name__ == "__main__":
    secret = get_aws_secret(PREFECT_G_SERVICE_SECRETNAME)
    print(secret)
This
sync_read_secret
function uses
asyncio.get_event_loop().run_until_complete
to run the asynchronous
read_secret
function in a synchronous manner. This approach allows you to maintain a synchronous flow while still leveraging asynchronous functions.