https://prefect.io logo
Title
h

Hamza Ahmed

09/18/2020, 5:34 PM
I am running into an issue when using credentials from AWSSecretsManager to fetch data from PostgresFetch as below:
from prefect import Flow, task
from prefect.tasks.aws.secrets_manager import AWSSecretsManager
from prefect.tasks.postgres.postgres import PostgresFetch

@task
def print_me(to_print):
    print(to_print)


with Flow('fetching-data') as flow:
    credentials = AWSSecretsManager(secret='pg/prefectsql/prefect')
    print_me(credentials)
    pg_user = credentials['username']
    pg_pass = credentials['password']
    pg_host = 'localhost'
    pg_port = credentials['port']
    pg_query = 'SELECT * FROM hdb_catalog.hdb_table LIMIT 5;'
    runsql = PostgresFetch(db_name='prefect', user=pg_user, host='localhost')
    result = runsql(password=pg_pass, query=pg_query, fetch='all')
    print_me(result)

flow.run()
the PostgresFetch initialization doesn't work when I try to use
user=credentials['username']
, but it does when I hardcode the username, or even if I set
pg_user
to the a string containing the username The error flow run produces the below output:
n

nicholas

09/18/2020, 5:40 PM
Hi @Hamza Ahmed - would you mind moving your flow run output here and removing it from the main channel? Typically we like to avoid big walls of code in the main channel.
h

Hamza Ahmed

09/18/2020, 5:53 PM
[2020-09-18 17:29:36] INFO - prefect.FlowRunner | Beginning Flow run for 'fetching-data'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'AWSSecretsManager': Starting task run...
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'AWSSecretsManager': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'print_me': Starting task run...
{'username': 'prefect', 'password': 'prefectpass', 'engine': 'postgres', 'host': '<http://dbidentifier.randstring.aws-region.rds.amazonaws.com|dbidentifier.randstring.aws-region.rds.amazonaws.com>', 'port': 5432, 'dbInstanceIdentifier': 'instancename'}
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'print_me': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'PostgresFetch': Starting task run...
[2020-09-18 17:29:37] ERROR - prefect.TaskRunner | Unexpected error: OperationalError('FATAL:  password authentication failed for user "<Task: GetItem>"\nFATAL:  password authentication failed for user "<Task: GetItem>"\n')
Traceback (most recent call last):
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\engine\runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\engine\task_runner.py", line 822, in get_task_run_state
    value = timeout_handler(
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\utilities\executors.py", line 188, in timeout_handler
    return fn(*args, **kwargs)
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\utilities\tasks.py", line 427, in method
    return run_method(self, *args, **kwargs)
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\tasks\postgres\postgres.py", line 190, in run
    conn = pg.connect(
  File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\psycopg2\__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  password authentication failed for user "<Task: GetItem>"
FATAL:  password authentication failed for user "<Task: GetItem>"

[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'PostgresFetch': finished task run for task with final state: 'Failed'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'print_me': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'print_me': finished task run for task with final state: 'TriggerFailed'
[2020-09-18 17:29:37] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

Process finished with exit code 0
Instead of passing the value for 
credentials['password']
 it is passing the task
@nicholas Sorry about that, I thought it was one big message
s

Skip Breidbach

09/18/2020, 6:28 PM
@nicholas is there something about class init parameters that doesn't convert Tasks to their actual values (i.e. the results of .run())? That's what it looks like is happening here
n

nicholas

09/18/2020, 6:46 PM
No worries @Hamza Ahmed ! The answer is yes @Skip Breidbach - when you init a Class, you can only pass in build-time parameters as opposed to run-time parameters. When you need values that are only available as a result of another task's run method, you need to pass them to downstream tasks as run-time parameters. Example:
Class FullName(Task):
  def __init__(self, first_name, last_name):
    self.first_name = first_name
    self.last_name = last_name

  def run(self, first_name, last_name):
     self.first_name = first_name if first_name is not None
     self.last_name = first_name if last_name is not None

     return {"first_name": self.first_name, "last_name": self.last_name}



with Flow("some flow") as flow:
  first_name = Parameter("first_name")
  last_name = "Boseman"

  full_name = FullName(last_name=last_name)(first_name=first_name)

  # At build time, the FullName class is initialized only with the last_name param
  # which we pass as a static string
  # However, it can't initialize first_name until runtime, because first_name
  # is the result of another task's (the Parameter task) runtime
  # 
  # if you tried to pass first_name in this way:
  # full_name = FullName(first_name=first_name, last_name=last_name)
  # you would be passing the a reference to the Parameter instantiation 
  # (since instantiation of classes in the flow context happens at build time)
  # instead of passing a reference to its runtime result
So @Hamza Ahmed - this means that if you want to pass args to a class instantiation (like
PostgresFetch
) that are determined by the result of another task's run, you'll need to pass them to the task's runtime context instead. In this particular case, you'll probably need to extend the
PostgresFetch
class, since it doesn't take user and host args at runtime, OR create a wrapper task that you can pass runtime args to, which can return the result of `PostgresFetch.run`:
Class RuntimePostgresFetch(PostgresFetch):
  def run(self, user, db_name, password):
    self.user = user
    self.password = password
    self.db_name = db_name
    super(RuntimePostgresFetch, self).run()
^ class extension method OR
@task
def postgres_fetch_wrapper(user, db_name, password, pg_query):
  return PostgresFetch(db_name=db_name, user=user, host='localhost').run(password=pg_pass, query=pg_query, fetch='all')
s

Skip Breidbach

09/18/2020, 7:48 PM
Got it @nicholas. @Hamza Ahmed and I had tried something similar to the 2nd option, but without the .run() - we tried instantiating a
PostgreFetch
task and returning it (essentially a class factory method) but that gave us an error like "you are trying to modify a task outside of the flow" or something like that. It's a good lesson to us as our takeaway in creating our own Task objects to be sure that only guaranteed-static items are included in the constructor. Thanks for the flavor and the workarounds!
n

nicholas

09/18/2020, 7:49 PM
Happy to help @Skip Breidbach - let me know if you hit any other snags.
s

Skip Breidbach

09/18/2020, 7:50 PM
Thanks, that's appreciated. We're really liking the platform, just hit an occasional subtlety like this 🙂
🚀 1
😄 1
h

Hamza Ahmed

09/21/2020, 1:08 PM
Thank you @nicholas! :)
🚀 1