Robin
07/27/2020, 8:45 AMBen Davison
07/27/2020, 11:04 AMRobin
07/27/2020, 12:02 PMflow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"],
registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/repository|user.dkr.ecr.eu-central-1.amazonaws.com/repository>",
)
As a first try I simply used aws ecr get-login-password...
to authenticate with AWS, which worked, but of course is not a long-term solution.
1. What is an instance profile
?
Furthermore, afterwards, I got the error, that 'repository/flow' does not exist
. So it seems like prefect adds the flow name to the repository_url. 🤔
2. How can I avoid the appending of the flow name or how do I need to use this to avoid the does not exist
error?image_name
🙂
flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="<http://user.dkr.ecr.eu-central-1.amazonaws.com/|user.dkr.ecr.eu-central-1.amazonaws.com/>", image_name="repository_name")
As a prefect and AWS ECR beginner, I find this a little bit confusing…AWS_DEFAULT_REGION
and AWS_CREDENTIALS
to the Prefect Secrets, but something seems to go wrong…
s_aws_region = Secret("AWS_DEFAULT_REGION")
s_aws_credentials = Secret("AWS_CREDENTIALS")
s = AWSSecretsManager(secret_name)
task = DbtShellTask(
environment="Development",
dbt_kwargs={
"type": "snowflake",
"threads": 1,
"account": s["sf_account"],
"user": s["sf_user"],
"password": s["sf_password"],
},
# profiles_dir=PROFILES,
)(command="dbt run")
helper_files = dict()
for path in Path('../dbt').rglob('*.*'):
source_file_path = str(path.absolute())
dest_file_path = f"src/{str(path)[3:]}"
helper_files[source_file_path] = dest_file_path
print(dest_file_path)
flow.storage = Docker(files=helper_files, python_dependencies=["prefect[aws]"], registry_url="user.dkr.ecr.eu-central-1.amazonaws.com/", image_name="repo_name", image_tag="beta")
@nicholas, any ideas?nicholas
s = AWSSecretsManager(secret_name)
I'm not seeing where you're passing your credentials and region:
s_aws_region = Secret("AWS_DEFAULT_REGION")
s_aws_credentials = Secret("AWS_CREDENTIALS")
Robin
07/27/2020, 7:48 PMclass AWSSecretsManager(SecretBase):
"""
Task for retrieving secrets from an AWS Secrets Manager and returning it as a dictionary.
Note that all initialization arguments can optionally be provided or overwritten at runtime.
For authentication, there are two options: you can set the `AWS_CREDENTIALS` Prefect Secret
containing your AWS access keys which will be passed directly to the `boto3` client, or you
can [configure your flow's runtime
environment](<https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#guide-configuration>)
for `boto3`.
Args:
- secret (str, optional): the name of the secret to retrieve
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(self, secret: str = None, **kwargs):
self.secret = secret
super().__init__(**kwargs)
@defaults_from_attrs("secret")
def run(self, secret: str = None, credentials: str = None) -> dict:
"""
Task run method.
Args:
- secret (str): the name of the secret to retrieve
- credentials (dict, optional): your AWS credentials passed from an upstream
Secret task; this Secret must be a JSON string
with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
passed directly to `boto3`. If not provided here or in context, `boto3`
will fall back on standard AWS rules for authentication.
Returns:
- dict: the contents of this secret, as a dictionary
"""
if secret is None:
raise ValueError("A secret name must be provided.")
secrets_client = get_boto_client("secretsmanager", credentials=credentials)
secret_string = secrets_client.get_secret_value(SecretId=secret)["SecretString"]
secret_dict = json.loads(secret_string)
return secret_dict
I thought of the folowing:
s = AWSSecretsManager(secret_name, credentials=s_aws_credentials)
But it gives the following error:
TypeError: __init__() got an unexpected keyword argument 'credentials'
nicholas
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials)
Robin
07/27/2020, 8:04 PMaws_access_key = credentials["ACCESS_KEY"]
TypeError: 'Secret' object is not subscriptable
nicholas
credentials
is referencing a Secret in Prefect, it won't have a property called "ACCESS_KEY"
at script runtime. One way you can get around that is by using the .get()
method on a Secret task. I think a better pattern though would be to instead get the Secret inside the task you'd like to use it in (and use the same get method). This'll give you access to all its properties in the task context, which is a better way of handling secrets overall.Robin
07/27/2020, 8:21 PMfrom prefect.tasks.aws.AWSSecretsManager
be edited such, that the credentials are called with .get()
method?
@defaults_from_attrs("secret")
def run(self, secret: str = None, credentials: str = None) -> dict:
"""
Task run method.
Args:
- secret (str): the name of the secret to retrieve
- credentials (dict, optional): your AWS credentials passed from an upstream
Secret task; this Secret must be a JSON string
with two keys: `ACCESS_KEY` and `SECRET_ACCESS_KEY` which will be
passed directly to `boto3`. If not provided here or in context, `boto3`
will fall back on standard AWS rules for authentication.
Returns:
- dict: the contents of this secret, as a dictionary
"""
if secret is None:
raise ValueError("A secret name must be provided.")
secrets_client = get_boto_client("secretsmanager", credentials=credentials)
Currently, all S3Upload
, S3Download
and AWSSecretsManager
have the run method defined as above…s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
nicholas
s_aws_credentials
is referencing a Secret task, then yes, I think that'd be correctRobin
07/27/2020, 8:38 PMValueError: Secrets should only be retrieved during a Flow run, not while building a Flow.
Flow definition:
import os
from pathlib import Path
import prefect
from prefect import Flow, task
from prefect.environments.storage import Docker
from prefect.tasks.aws import AWSSecretsManager
from prefect.tasks.dbt import DbtShellTask
from prefect.client import Secret
# PROFILES = "/Users/robinbeer/.dbt/"
secret_name = "sf_credentials"
@task
def hello_task(secret):
print("I've got a secret")
print(secret)
# print(secret.get())
with Flow("dbt_flow") as flow:
# s_aws_region = Secret("AWS_DEFAULT_REGION")
s_aws_credentials = Secret("AWS_CREDENTIALS")
hello_task(s_aws_credentials)
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
# task = DbtShellTask(
# environment="Development",
# dbt_kwargs={
# "type": "snowflake",
# "threads": 1,
# "account": s["sf_account"],
# "user": s["sf_user"],
# "password": s["sf_password"],
# },
# # profiles_dir=PROFILES,
# )(command="dbt run")
helper_files = dict()
for path in Path("../dbt").rglob("*.*"):
source_file_path = str(path.absolute())
dest_file_path = f"src/{str(path)[3:]}"
helper_files[source_file_path] = dest_file_path
print(dest_file_path)
flow.storage = Docker(
files=helper_files,
python_dependencies=["prefect[aws]"],
registry_url="<http://782647223753.dkr.ecr.eu-central-1.amazonaws.com/|782647223753.dkr.ecr.eu-central-1.amazonaws.com/>",
image_name="accure-50a39f7",
image_tag="beta",
)
# flow.run()
flow.register(project_name="eks_test_01")
@task
def retrieve_aws_secrets(s_aws_credentials):
s = AWSSecretsManager(secret_name)(credentials=s_aws_credentials.get())
return s
with Flow("dbt_flow") as flow:
# s_aws_region = Secret("AWS_DEFAULT_REGION")
s_aws_credentials = Secret("AWS_CREDENTIALS")
hello_task(s_aws_credentials)
s = retrieve_aws_secrets(s_aws_credentials)
However, a new error results:
Unexpected error: ValueError('Could not infer an active Flow context.')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
value = timeout_handler(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
return fn(*args, **kwargs)
File "/Users/robinbeer/dev/code/ACCURE/accure-etl/src/prefect-flows/dbt_flow.py", line 25, in retrieve_aws_secrets
File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 470, in __call__
new.bind(
File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 523, in bind
raise ValueError("Could not infer an active Flow context.")
ValueError: Could not infer an active Flow context.
nicholas
@task
def retrieve_aws_credentials():
return Secret("AWS_CREDENTIALS").get()
with Flow("dbt_flow") as flow:
s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials)
Robin
07/27/2020, 9:09 PMNoRegionError
I got:
Unexpected error: NoRegionError('You must specify a region.')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 801, in get_task_run_state
value = timeout_handler(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 445, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/aws/secrets_manager.py", line 49, in run
secrets_client = get_boto_client("secretsmanager", credentials=credentials)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/aws.py", line 55, in get_boto_client
return boto3.client(
File "/usr/local/lib/python3.8/site-packages/boto3/__init__.py", line 91, in client
return _get_default_session().client(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/boto3/session.py", line 258, in client
return self._session.create_client(
File "/usr/local/lib/python3.8/site-packages/botocore/session.py", line 831, in create_client
client = client_creator.create_client(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 83, in create_client
client_args = self._get_client_args(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 285, in _get_client_args
return args_creator.get_client_args(
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 71, in get_client_args
final_args = self.compute_client_args(
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 147, in compute_client_args
endpoint_config = self._compute_endpoint_config(
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 218, in _compute_endpoint_config
return self._resolve_endpoint(**resolve_endpoint_kwargs)
File "/usr/local/lib/python3.8/site-packages/botocore/args.py", line 300, in _resolve_endpoint
return endpoint_bridge.resolve(
File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 360, in resolve
resolved = self.endpoint_resolver.construct_endpoint(
File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 133, in construct_endpoint
result = self._endpoint_for_partition(
File "/usr/local/lib/python3.8/site-packages/botocore/regions.py", line 148, in _endpoint_for_partition
raise NoRegionError()
botocore.exceptions.NoRegionError: You must specify a region.
nicholas
region_name
, using environment variables or a configuration file for BOTO3. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html should help you configure your machine to talk with AWSRobin
07/27/2020, 9:34 PM@task
def set_aws_region():
s_aws_region = Secret("AWS_DEFAULT_REGION").get()
os.environ["AWS_DEFAULT_REGION"] = s_aws_region
with Flow("dbt_flow") as flow:
s_aws_credentials = Secret("AWS_CREDENTIALS")
hello_task(s_aws_credentials)
set_aws_region()
s = AWSSecretsManager(secret_name)(credentials=retrieve_aws_credentials())
It took me a while to understand that I also have to run os.environ
in a task instead of just in the flow.
Thanks @nicholas, that took me a while :Snicholas
Robin
07/27/2020, 9:36 PMAWS_DEFAULT_REGION
) such that one does not have to “manually” add these with os.environ.
Is that generally incorrect or does boto3 make these troubles?Ah yes, the reason for that is that running it in the Flow context doesn’t set the variable in the flow run context 🙂Had to read the sentence twice 😄 Thanks again! Always good to solve problems just before midnight for some good sleep 🙏