Hamza Ahmed
09/18/2020, 5:34 PMfrom prefect import Flow, task
from prefect.tasks.aws.secrets_manager import AWSSecretsManager
from prefect.tasks.postgres.postgres import PostgresFetch
@task
def print_me(to_print):
print(to_print)
with Flow('fetching-data') as flow:
credentials = AWSSecretsManager(secret='pg/prefectsql/prefect')
print_me(credentials)
pg_user = credentials['username']
pg_pass = credentials['password']
pg_host = 'localhost'
pg_port = credentials['port']
pg_query = 'SELECT * FROM hdb_catalog.hdb_table LIMIT 5;'
runsql = PostgresFetch(db_name='prefect', user=pg_user, host='localhost')
result = runsql(password=pg_pass, query=pg_query, fetch='all')
print_me(result)
flow.run()
the PostgresFetch initialization doesn't work when I try to use user=credentials['username']
, but it does when I hardcode the username, or even if I set pg_user
to the a string containing the username
The error flow run produces the below output:nicholas
09/18/2020, 5:40 PMHamza Ahmed
09/18/2020, 5:53 PM[2020-09-18 17:29:36] INFO - prefect.FlowRunner | Beginning Flow run for 'fetching-data'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'AWSSecretsManager': Starting task run...
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'AWSSecretsManager': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'print_me': Starting task run...
{'username': 'prefect', 'password': 'prefectpass', 'engine': 'postgres', 'host': '<http://dbidentifier.randstring.aws-region.rds.amazonaws.com|dbidentifier.randstring.aws-region.rds.amazonaws.com>', 'port': 5432, 'dbInstanceIdentifier': 'instancename'}
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'print_me': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:36] INFO - prefect.TaskRunner | Task 'PostgresFetch': Starting task run...
[2020-09-18 17:29:37] ERROR - prefect.TaskRunner | Unexpected error: OperationalError('FATAL: password authentication failed for user "<Task: GetItem>"\nFATAL: password authentication failed for user "<Task: GetItem>"\n')
Traceback (most recent call last):
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\engine\task_runner.py", line 822, in get_task_run_state
value = timeout_handler(
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\utilities\executors.py", line 188, in timeout_handler
return fn(*args, **kwargs)
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\utilities\tasks.py", line 427, in method
return run_method(self, *args, **kwargs)
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\prefect\tasks\postgres\postgres.py", line 190, in run
conn = pg.connect(
File "C:\Users\hahmed\repos\prefectsecrets\venv\lib\site-packages\psycopg2\__init__.py", line 127, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: password authentication failed for user "<Task: GetItem>"
FATAL: password authentication failed for user "<Task: GetItem>"
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'PostgresFetch': finished task run for task with final state: 'Failed'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'GetItem': finished task run for task with final state: 'Success'
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'print_me': Starting task run...
[2020-09-18 17:29:37] INFO - prefect.TaskRunner | Task 'print_me': finished task run for task with final state: 'TriggerFailed'
[2020-09-18 17:29:37] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Process finished with exit code 0
credentials['password']
it is passing the taskSkip Breidbach
09/18/2020, 6:28 PMnicholas
09/18/2020, 6:46 PMClass FullName(Task):
def __init__(self, first_name, last_name):
self.first_name = first_name
self.last_name = last_name
def run(self, first_name, last_name):
self.first_name = first_name if first_name is not None
self.last_name = first_name if last_name is not None
return {"first_name": self.first_name, "last_name": self.last_name}
with Flow("some flow") as flow:
first_name = Parameter("first_name")
last_name = "Boseman"
full_name = FullName(last_name=last_name)(first_name=first_name)
# At build time, the FullName class is initialized only with the last_name param
# which we pass as a static string
# However, it can't initialize first_name until runtime, because first_name
# is the result of another task's (the Parameter task) runtime
#
# if you tried to pass first_name in this way:
# full_name = FullName(first_name=first_name, last_name=last_name)
# you would be passing the a reference to the Parameter instantiation
# (since instantiation of classes in the flow context happens at build time)
# instead of passing a reference to its runtime result
PostgresFetch
) that are determined by the result of another task's run, you'll need to pass them to the task's runtime context instead. In this particular case, you'll probably need to extend the PostgresFetch
class, since it doesn't take user and host args at runtime, OR create a wrapper task that you can pass runtime args to, which can return the result of `PostgresFetch.run`:
Class RuntimePostgresFetch(PostgresFetch):
def run(self, user, db_name, password):
self.user = user
self.password = password
self.db_name = db_name
super(RuntimePostgresFetch, self).run()
^ class extension method
OR
@task
def postgres_fetch_wrapper(user, db_name, password, pg_query):
return PostgresFetch(db_name=db_name, user=user, host='localhost').run(password=pg_pass, query=pg_query, fetch='all')
Skip Breidbach
09/18/2020, 7:48 PMPostgreFetch
task and returning it (essentially a class factory method) but that gave us an error like "you are trying to modify a task outside of the flow" or something like that. It's a good lesson to us as our takeaway in creating our own Task objects to be sure that only guaranteed-static items are included in the constructor. Thanks for the flavor and the workarounds!nicholas
09/18/2020, 7:49 PMSkip Breidbach
09/18/2020, 7:50 PMHamza Ahmed
09/21/2020, 1:08 PM