Nicholas Hemley
07/24/2021, 9:44 AMAmanda Wee
07/24/2021, 10:26 AMNicholas Hemley
07/24/2021, 10:49 AMKevin Kho
emre
07/24/2021, 8:02 PMKevin Kho
del xxx
and then gc.collect()
but it’s not nice like you are saying when you have two separate tasks because it becomes hard to delete upstream variables in downstream operations. This is something on our radar, but in the meantime can be addressed by indeed combining tasks together.
If you had two separate tasks, you could call the MSSQLTask.run()
and GCSUpload.run()
inside a bigger task. You do lose a bit of individual retries, but I think that would be true even if you compress the tasks yourself. If you only wanted to retry the GCSUpload
, you’d need to pass through the MSSQLTask
as well when they are combined.emre
07/24/2021, 9:27 PMKevin Kho
Nicholas Hemley
07/26/2021, 3:17 PMKevin Kho
se = PrefectSecret()
with Flow("param-test") as flow:
y = se.map(["SLACK_WEBHOOK_URL", "SLACK_WEBHOOK_URL"])
abc(y)
This will give a list of passwords. And then you can map over the pairs of connections with your db related tasks. Does that make sense?Nicholas Hemley
07/26/2021, 4:15 PMNicholas Hemley
07/26/2021, 4:15 PM@mock.patch.dict(os.environ, {"PREFECT__CONTEXT__SECRETS__GS_MYSQL1": "password1", "PREFECT__CONTEXT__SECRETS__GS_MYSQL2": "password2"})
def test_11(self):
prefect_secret = PrefectSecret()
with Flow("test_11") as flow:
credentials = prefect_secret.map(["GS_MYSQL1", "GS_MYSQL2"])
self.test_11_task(self, credentials=credentials)
flow.run_config = LocalRun(env={"PREFECT__CONTEXT__SECRETS__GS_MYSQL1": "password1", "PREFECT__CONTEXT__SECRETS__GS_MYSQL2": "password2"})
flow.run()
Nicholas Hemley
07/26/2021, 4:16 PMKevin Kho
Kevin Kho
export PREFECT__CONTEXT__SECRETS__FOO_FOO="BAR"
and then
from prefect import task, Flow, Parameter
import prefect
from prefect.tasks.secrets import PrefectSecret
@task
def abc(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{x}")
return x
se = PrefectSecret()
with Flow("param-test") as flow:
y = se.map(["FOO_FOO", "FOO_FOO"])
abc(y)
flow.run()
and this worked for meNicholas Hemley
07/26/2021, 6:38 PMNicholas Hemley
07/27/2021, 10:04 AMNicholas Hemley
07/27/2021, 10:04 AMdef test_20(self):
"""
Puts together both approaches to demonstrate both settings & secrets together
Test Run configuration:
PREFECT__CONTEXT__SECRETS__GS_MYSQL1= "password1";PREFECT__CONTEXT__SECRETS__GS_MYSQL2= "password2"
:return:
"""
os.environ["GS_MYSQL"] = '{"GS_MYSQL1": {"host": "host1", "port": "3306",' \
'"username": "gs-user", "client_id": "1"},' \
'"GS_MYSQL2":{"host": "host2", "port": "3306",' \
'"username": "gs-user", "client_id": "2"} }'
p = EnvVarSecret('GS_MYSQL')
config_string = p.run()
config_dict = ast.literal_eval(config_string)
# print(config_dict)
prefect_secret = PrefectSecret()
with Flow("test_20") as flow:
# note in python 3, dict.keys() returns a dict_keys type (!)
key_list = list(config_dict.keys())
credentials_list = prefect_secret.map(key_list)
combined_list = combine_credentials(config_dict, credentials_list)
state = flow.run()
m = state.result[combined_list]
# print(m.result)
assert len(m.result.keys()) == 2
for key in m.result.keys():
assert len(m.result[key]) == 5
Nicholas Hemley
07/27/2021, 10:05 AMNicholas Hemley
07/27/2021, 10:05 AMKevin Kho