Robin
07/27/2020, 8:45 AMBen Davison
07/27/2020, 11:04 AMRobin
07/27/2020, 12:02 PMflow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], 
                    registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/repository|user.dkr.ecr.eu-central-1.amazonaws.com/repository>", 
                    )aws ecr get-login-password...instance profile'repository/flow' does not existdoes not existRobin
07/27/2020, 2:04 PMimage_nameflow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/|user.dkr.ecr.eu-central-1.amazonaws.com/>", image_name="repository_name")Robin
07/27/2020, 3:06 PMAWS_DEFAULT_REGIONAWS_CREDENTIALSs_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    s = AWSSecretsManager(secret_name)
    task = DbtShellTask(
        environment="Development",
        dbt_kwargs={
            "type": "snowflake",
            "threads": 1,
            "account": s["sf_account"],
            "user": s["sf_user"],
            "password": s["sf_password"],
        },
        # profiles_dir=PROFILES,
    )(command="dbt run")
helper_files = dict()
for path in Path('../dbt').rglob('*.*'):
    source_file_path = str(path.absolute())
    dest_file_path = f"src/{str(path)[3:]}"
    helper_files[source_file_path] = dest_file_path
    print(dest_file_path)
flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="user.dkr.ecr.eu-central-1.amazonaws.com/", image_name="repo_name", image_tag="beta")nicholas
s = AWSSecretsManager(secret_name)s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")nicholas
Robin
07/27/2020, 7:48 PMclass AWSSecretsManager(SecretBase):
    """
    Task for retrieving secrets from an AWS Secrets Manager and returning it as a dictionary.
    Note that all initialization arguments can optionally be provided or overwritten at runtime.
    For authentication, there are two options: you can set the `AWS_CREDENTIALS` Prefect Secret
    containing your AWS access keys which will be passed directly to the `boto3` client, or you
    can [configure your flow's runtime
    environment](<https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#guide-configuration>)
    for `boto3`.
    Args:
        - secret (str, optional): the name of the secret to retrieve
        - **kwargs (dict, optional): additional keyword arguments to pass to the
            Task constructor
    """
    def __init__(self, secret: str = None, **kwargs):
        self.secret = secret
        super().__init__(**kwargs)
    @defaults_from_attrs("secret")
    def run(self, secret: str = None, credentials: str = None) -> dict:
        """
        Task run method.
        Args:
            - secret (str): the name of the secret to retrieve
            - credentials (dict, optional): your AWS credentials passed from an upstream
                Secret task; this Secret must be a JSON string
                with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
                passed directly to `boto3`.  If not provided here or in context, `boto3`
                will fall back on standard AWS rules for authentication.
        Returns:
            - dict: the contents of this secret, as a dictionary
        """
        if secret is None:
            raise ValueError("A secret name must be provided.")
        secrets_client = get_boto_client("secretsmanager", credentials=credentials)
        secret_string = secrets_client.get_secret_value(SecretId=secret)["SecretString"]
        secret_dict = json.loads(secret_string)
        return secret_dicts = AWSSecretsManager(secret_name, credentials=s_aws_credentials)TypeError: __init__() got an unexpected keyword argument 'credentials'nicholas
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials)Robin
07/27/2020, 8:04 PMaws_access_key = credentials["ACCESS_KEY"]
TypeError: 'Secret' object is not subscriptablenicholas
credentials"ACCESS_KEY".get()Robin
07/27/2020, 8:21 PMfrom prefect.tasks.aws.AWSSecretsManager.get()@defaults_from_attrs("secret")
    def run(self, secret: str = None, credentials: str = None) -> dict:
        """
        Task run method.
        Args:
            - secret (str): the name of the secret to retrieve
            - credentials (dict, optional): your AWS credentials passed from an upstream
                Secret task; this Secret must be a JSON string
                with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
                passed directly to `boto3`.  If not provided here or in context, `boto3`
                will fall back on standard AWS rules for authentication.
        Returns:
            - dict: the contents of this secret, as a dictionary
        """
        if secret is None:
            raise ValueError("A secret name must be provided.")
        secrets_client = get_boto_client("secretsmanager", credentials=credentials)S3UploadS3DownloadAWSSecretsManagerRobin
07/27/2020, 8:23 PMs = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())nicholas
nicholas
s_aws_credentialsRobin
07/27/2020, 8:38 PMValueError: Secrets should only be retrieved during a Flow run, not while building a Flow.import os
from pathlib import Path
import prefect
from prefect import Flow, task
from prefect.environments.storage import Docker
from prefect.tasks.aws import AWSSecretsManager
from prefect.tasks.dbt import DbtShellTask
from prefect.client import Secret
# PROFILES = "/Users/robinbeer/.dbt/"
secret_name = "sf_credentials"
@task
def hello_task(secret):
    print("I've got a secret")
    print(secret)
    # print(secret.get())
with Flow("dbt_flow") as flow:
    # s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)
    s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
    # task = DbtShellTask(
    #     environment="Development",
    #     dbt_kwargs={
    #         "type": "snowflake",
    #         "threads": 1,
    #         "account": s["sf_account"],
    #         "user": s["sf_user"],
    #         "password": s["sf_password"],
    #     },
    #     # profiles_dir=PROFILES,
    # )(command="dbt run")
helper_files = dict()
for path in Path("../dbt").rglob("*.*"):
    source_file_path = str(path.absolute())
    dest_file_path = f"src/{str(path)[3:]}"
    helper_files[source_file_path] = dest_file_path
    print(dest_file_path)
flow.storage = Docker(
    files=helper_files,
    python_dependencies=["prefect[aws]"],
    registry_url="<http://782647223753.dkr.ecr.eu-central-1.amazonaws.com/|782647223753.dkr.ecr.eu-central-1.amazonaws.com/>",
    image_name="accure-50a39f7",
    image_tag="beta",
)
# flow.run()
flow.register(project_name="eks_test_01")Robin
07/27/2020, 8:47 PM@task
def retrieve_aws_secrets(s_aws_credentials):
    s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
    return s
with Flow("dbt_flow") as flow:
    # s_aws_region = Secret("AWS_DEFAULT_REGION")
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)
    s = retrieve_aws_secrets(s_aws_credentials)Unexpected error: ValueError('Could not infer an active Flow context.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
    value = timeout_handler(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
  File "/Users/robinbeer/dev/code/ACCURE/accure-etl/src/prefect-flows/dbt_flow.py", line 25, in retrieve_aws_secrets
  File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
    new.bind(
  File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 523, in bind
    raise ValueError("Could not infer an active Flow context.")
ValueError: Could not infer an active Flow context.nicholas
@task
def retrieve_aws_credentials():
    return Secret("AWS_CREDENTIALS").get()
with Flow("dbt_flow") as flow:
    s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials)Robin
07/27/2020, 9:09 PMNoRegionErrorUnexpected error: NoRegionError('You must specify a region.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
    value = timeout_handler(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 445, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/aws/secrets_manager.py", line 49, in run
    secrets_client = get_boto_client("secretsmanager", credentials=credentials)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/aws.py", line 55, in get_boto_client
    return boto3.client(
  File "/usr/local/lib/python3.8/site-packages/boto3/__init__.py", line 91, in client
    return _get_default_session().client(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/boto3/session.py", line 258, in client
    return self._session.create_client(
  File "/usr/local/lib/python3.8/site-packages/botocore/session.py", line 831, in create_client
    client = client_creator.create_client(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 83, in create_client
    client_args = self._get_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 285, in _get_client_args
    return args_creator.get_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 71, in get_client_args
    final_args = self.compute_client_args(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 147, in compute_client_args
    endpoint_config = self._compute_endpoint_config(
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 218, in _compute_endpoint_config
    return self._resolve_endpoint(**resolve_endpoint_kwargs)
  File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 300, in _resolve_endpoint
    return endpoint_bridge.resolve(
  File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 360, in resolve
    resolved = self.endpoint_resolver.construct_endpoint(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
    result = self._endpoint_for_partition(
  File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
    raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.nicholas
region_nameRobin
07/27/2020, 9:34 PM@task
def set_aws_region():
    s_aws_region = Secret("AWS_DEFAULT_REGION").get()
    os.environ["AWS_DEFAULT_REGION"] = s_aws_region
with Flow("dbt_flow") as flow:
    s_aws_credentials = Secret("AWS_CREDENTIALS")
    hello_task(s_aws_credentials)
    set_aws_region()
    s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials())os.environnicholas
nicholas
Robin
07/27/2020, 9:36 PMAWS_DEFAULT_REGIONos.environ.Robin
07/27/2020, 9:37 PMAh yes, the reason for that is that running it in the Flow context doesn’t set the variable in the flow run context 🙂Had to read the sentence twice 😄 Thanks again! Always good to solve problems just before midnight for some good sleep 🙏
