https://prefect.io logo
#prefect-community
Title
# prefect-community
r

Robin

07/27/2020, 8:45 AM
Dear all, I am currently trying to use AWS ECR as container registry, after having successfully experimented with dockerhub. What is the best way to authenticate via prefect to AWS ECR? 🙂
1
b

Ben Davison

07/27/2020, 11:04 AM
Is this from an EC2 instance? ECS etc? Use an instance profile which can access ECR
r

Robin

07/27/2020, 12:02 PM
Hey @Ben Davison, thanks for replying! Firstly, I only try to register a flow from a local terminal as follows
Copy code
flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], 
                    registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/repository|user.dkr.ecr.eu-central-1.amazonaws.com/repository>", 
                    )
As a first try I simply used
aws ecr get-login-password...
to authenticate with AWS, which worked, but of course is not a long-term solution. 1. What is an
instance profile
? Furthermore, afterwards, I got the error, that
'repository/flow' does not exist
. So it seems like prefect adds the flow name to the repository_url. 🤔 2. How can I avoid the appending of the flow name or how do I need to use this to avoid the
does not exist
error?
I solved 2 adding the repository name as
image_name
🙂
Copy code
flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/|user.dkr.ecr.eu-central-1.amazonaws.com/>", image_name="repository_name")
As a prefect and AWS ECR beginner, I find this a little bit confusing…
🚀 1
Unfortunately, I am still struggling with AWS credentials: > botocore.exceptions.NoRegionError: You must specify a region. > link I added
AWS_DEFAULT_REGION
and
AWS_CREDENTIALS
to the Prefect Secrets, but something seems to go wrong…
Copy code
s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")

    s = AWSSecretsManager(secret_name)

    task = DbtShellTask(
        environment="Development",
        dbt_kwargs={
            "type": "snowflake",
            "threads": 1,
            "account": s["sf_account"],
            "user": s["sf_user"],
            "password": s["sf_password"],
        },
        # profiles_dir=PROFILES,
    )(command="dbt run")


helper_files = dict()

for path in Path('../dbt').rglob('*.*'):
    source_file_path = str(path.absolute())
    dest_file_path = f"src/{str(path)[3:]}"
    helper_files[source_file_path] = dest_file_path
    print(dest_file_path)


flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="user.dkr.ecr.eu-central-1.amazonaws.com/", image_name="repo_name", image_tag="beta")
@nicholas, any ideas?
n

nicholas

07/27/2020, 3:12 PM
hi @Robin - it looks like this line might be referencing something that's not available in the flow context:
Copy code
s = AWSSecretsManager(secret_name)
I'm not seeing where you're passing your credentials and region:
Copy code
s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
To your question re: instance profiles, those are an AWS-specific entity related to IAM, I think the AWS docs will be more helpful to you there than I will. For registration, Prefect expects that your Docker daemon is authenticated outside of Prefect context, so the machine you want to register flows from will need to go through that external login step, since it's a private registry.
r

Robin

07/27/2020, 7:48 PM
Thanks for further explaining instance profiles, I will dive deeper in the documentation.. Concerning the credentials forwarding, how do I pass the credentials to the run method of the secretsmanager?
Copy code
class AWSSecretsManager(SecretBase):
    """
    Task for retrieving secrets from an AWS Secrets Manager and returning it as a dictionary.
    Note that all initialization arguments can optionally be provided or overwritten at runtime.

    For authentication, there are two options: you can set the `AWS_CREDENTIALS` Prefect Secret
    containing your AWS access keys which will be passed directly to the `boto3` client, or you
    can [configure your flow's runtime
    environment](<https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#guide-configuration>)
    for `boto3`.

    Args:
        - secret (str, optional): the name of the secret to retrieve
        - **kwargs (dict, optional): additional keyword arguments to pass to the
            Task constructor
    """

    def __init__(self, secret: str = None, **kwargs):
        self.secret = secret
        super().__init__(**kwargs)

    @defaults_from_attrs("secret")
    def run(self, secret: str = None, credentials: str = None) -> dict:
        """
        Task run method.

        Args:
            - secret (str): the name of the secret to retrieve
            - credentials (dict, optional): your AWS credentials passed from an upstream
                Secret task; this Secret must be a JSON string
                with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
                passed directly to `boto3`.  If not provided here or in context, `boto3`
                will fall back on standard AWS rules for authentication.

        Returns:
            - dict: the contents of this secret, as a dictionary
        """

        if secret is None:
            raise ValueError("A secret name must be provided.")

        secrets_client = get_boto_client("secretsmanager", credentials=credentials)

        secret_string = secrets_client.get_secret_value(SecretId=secret)["SecretString"]

        secret_dict = json.loads(secret_string)

        return secret_dict
I thought of the folowing:
Copy code
s = AWSSecretsManager(secret_name, credentials=s_aws_credentials)
But it gives the following error:
Copy code
TypeError: __init__() got an unexpected keyword argument 'credentials'
n

nicholas

07/27/2020, 7:50 PM
Try:
Copy code
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials)
r

Robin

07/27/2020, 8:04 PM
Ah, thanks! Now I get the following error:
Copy code
aws_access_key = credentials["ACCESS_KEY"]
TypeError: 'Secret' object is not subscriptable
n

nicholas

07/27/2020, 8:10 PM
Ah, it looks like you're trying to access the result of a Secret in a context where the result isn't available. Something to note about things like Secrets and Parameters in Prefect is that they're all Tasks under the surface and so need to be accessed in the same way. If
credentials
is referencing a Secret in Prefect, it won't have a property called
"ACCESS_KEY"
at script runtime. One way you can get around that is by using the
.get()
method on a Secret task. I think a better pattern though would be to instead get the Secret inside the task you'd like to use it in (and use the same get method). This'll give you access to all its properties in the task context, which is a better way of handling secrets overall.
r

Robin

07/27/2020, 8:21 PM
OK, so should the
from prefect.tasks.aws.AWSSecretsManager
be edited such, that the credentials are called with
.get()
method?
Copy code
@defaults_from_attrs("secret")
    def run(self, secret: str = None, credentials: str = None) -> dict:
        """
        Task run method.

        Args:
            - secret (str): the name of the secret to retrieve
            - credentials (dict, optional): your AWS credentials passed from an upstream
                Secret task; this Secret must be a JSON string
                with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
                passed directly to `boto3`.  If not provided here or in context, `boto3`
                will fall back on standard AWS rules for authentication.

        Returns:
            - dict: the contents of this secret, as a dictionary
        """

        if secret is None:
            raise ValueError("A secret name must be provided.")

        secrets_client = get_boto_client("secretsmanager", credentials=credentials)
Currently, all
S3Upload
,
S3Download
and
AWSSecretsManager
have the run method defined as above…
You don’t mean sth like this, do you?
Copy code
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
n

nicholas

07/27/2020, 8:23 PM
No, I think those are correct, they're not looking for a secret task, just the string reference.
It's a little tough to keep track of all the variables here with all the random blocks of code, if
s_aws_credentials
is referencing a Secret task, then yes, I think that'd be correct
r

Robin

07/27/2020, 8:38 PM
I tried to remove the unimportant lines. Below is the complete flow definition. Unfortunately, I still get the following error:
Copy code
ValueError: Secrets should only be retrieved during a Flow run, not while building a Flow.
Flow definition:
Copy code
import os
from pathlib import Path

import prefect
from prefect import Flow, task
from prefect.environments.storage import Docker
from prefect.tasks.aws import AWSSecretsManager
from prefect.tasks.dbt import DbtShellTask

from prefect.client import Secret

# PROFILES = "/Users/robinbeer/.dbt/"
secret_name = "sf_credentials"


@task
def hello_task(secret):
    print("I've got a secret")
    print(secret)
    # print(secret.get())


with Flow("dbt_flow") as flow:

    # s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)

    s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())

    # task = DbtShellTask(
    #     environment="Development",
    #     dbt_kwargs={
    #         "type": "snowflake",
    #         "threads": 1,
    #         "account": s["sf_account"],
    #         "user": s["sf_user"],
    #         "password": s["sf_password"],
    #     },
    #     # profiles_dir=PROFILES,
    # )(command="dbt run")


helper_files = dict()

for path in Path("../dbt").rglob("*.*"):
    source_file_path = str(path.absolute())
    dest_file_path = f"src/{str(path)[3:]}"
    helper_files[source_file_path] = dest_file_path
    print(dest_file_path)


flow.storage = Docker(
    files=helper_files,
    python_dependencies=["prefect[aws]"],
    registry_url="<http://782647223753.dkr.ecr.eu-central-1.amazonaws.com/|782647223753.dkr.ecr.eu-central-1.amazonaws.com/>",
    image_name="accure-50a39f7",
    image_tag="beta",
)

# flow.run()
flow.register(project_name="eks_test_01")
I just defined a new task as follows which seems to avoid the error above:
Copy code
@task
def retrieve_aws_secrets(s_aws_credentials):
    s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())

    return s


with Flow("dbt_flow") as flow:

    # s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)

    s = retrieve_aws_secrets(s_aws_credentials)
However, a new error results:
Copy code
Unexpected error: ValueError('Could not infer an active Flow context.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
    value = timeout_handler(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
  File "/Users/robinbeer/dev/code/ACCURE/accure-etl/src/prefect-flows/dbt_flow.py", line 25, in retrieve_aws_secrets
  File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
    new.bind(
  File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 523, in bind
    raise ValueError("Could not infer an active Flow context.")
ValueError: Could not infer an active Flow context.
n

nicholas

07/27/2020, 8:51 PM
Try this instead:
Copy code
@task
def retrieve_aws_credentials():
    return Secret("AWS_CREDENTIALS").get()

with Flow("dbt_flow") as flow:
    s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials)
r

Robin

07/27/2020, 9:09 PM
Yep, that worked, thanks! However, I am now back at the initial
NoRegionError
I got:
Copy code
Unexpected error: NoRegionError('You must specify a region.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
    value = timeout_handler(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 445, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/aws/secrets_manager.py", line 49, in run
    secrets_client = get_boto_client("secretsmanager", credentials=credentials)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/aws.py", line 55, in get_boto_client
    return boto3.client(
  File "/usr/local/lib/python3.8/site-packages/boto3/__init__.py", line 91, in client
    return _get_default_session().client(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/boto3/session.py", line 258, in client
    return self._session.create_client(
  File "/usr/local/lib/python3.8/site-packages/botocore/session.py", line 831, in create_client
    client = client_creator.create_client(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 83, in create_client
    client_args = self._get_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 285, in _get_client_args
    return args_creator.get_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 71, in get_client_args
    final_args = self.compute_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 147, in compute_client_args
    endpoint_config = self._compute_endpoint_config(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 218, in _compute_endpoint_config
    return self._resolve_endpoint(**resolve_endpoint_kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 300, in _resolve_endpoint
    return endpoint_bridge.resolve(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 360, in resolve
    resolved = self.endpoint_resolver.construct_endpoint(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
    result = self._endpoint_for_partition(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
n

nicholas

07/27/2020, 9:20 PM
It looks like you'll need to pass in a default
region_name
, using environment variables or a configuration file for BOTO3. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html should help you configure your machine to talk with AWS
r

Robin

07/27/2020, 9:34 PM
Ah, finally it worked with the following code:
Copy code
@task
def set_aws_region():
    s_aws_region = Secret("AWS_DEFAULT_REGION").get()

    os.environ["AWS_DEFAULT_REGION"] = s_aws_region


with Flow("dbt_flow") as flow:

    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)

    set_aws_region()

    s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials())
It took me a while to understand that I also have to run
os.environ
in a task instead of just in the flow. Thanks @nicholas, that took me a while :S
🚀 1
n

nicholas

07/27/2020, 9:35 PM
Ah yes, the reason for that is that running it in the Flow context doesn't set the variable in the flow run context 🙂
🤔 1
😄 1
Glad you got it figured out
r

Robin

07/27/2020, 9:36 PM
Somehow, I assumed that the python interpreters would automatically check whether some secrets have the correct name (in this case
AWS_DEFAULT_REGION
) such that one does not have to “manually” add these with
os.environ.
Is that generally incorrect or does boto3 make these troubles?
Ah yes, the reason for that is that running it in the Flow context doesn’t set the variable in the flow run context 🙂
Had to read the sentence twice 😄 Thanks again! Always good to solve problems just before midnight for some good sleep 🙏
😄 1
4 Views