Adam Shamlian
05/20/2021, 3:18 PM@task
def generate_inputs_from_params(args):
#....
@task
def create_db_conn(args):
# ....
@task
def do_db_work(args):
# ....
@task
def do_some_other_work(args):
# ...
with Flow("example") as f:
# Parameter tasks
conn_inputs, db_work_inputs, other_work_inputs = generate_inputs_from_params(args) # from param tasks
conn_map = create_db_conn.map(conn_inputs)
res_map = do_db_work.map(conn_map, db_work_inputs)
res2_map = do_some_other_work(res_map, other_work_inputs)
# some reduce func if necessary
I have two questions about this:
1. Is that flow constructed properly - I'm ultimately after something like:
inputs = generate_inputs_from_params(args) # from param tasks
for (conn_input, db_work_input, other_work_input) in inputs:
conn = create_db_conn(conn_input)
res = do_db_work(conn, db_work_input)
res2 = do_some_other_work(res2, other_work_input)
2. When mapping over credentials dynamically, would I inject `Secret`s into the conn_inputs
or would I resolve the proper Secret
"within the for loop" (i.e. extending the map chain to include an additional layer that resolves `Secret`s)? My understanding of docs is that if I do the former, the secret data would be exposed in conn_inputs
, which in a distributed environment means that plaintext credentials could be making network hops, or in any environment would be persisted as part of Result
instances. I'd like to make sure I'm understanding this correctly.Kevin Kho
conn_map = create_db_conn.map(conn_inputs)
looks like it’s returning connections? When using Dask for parallelization, task outputs need to be serializable by cloudpickle. If you combine them into one task that creates the connection and does the owrk, it might work.Adam Shamlian
05/20/2021, 3:26 PMKevin Kho
Kevin Kho
PrefectSecret("MYSECRET").rum()
Adam Shamlian
05/20/2021, 3:37 PMcreate_db_conn()
and `do_db_work()`would become a single do_db_work(conn_string_args, other_args)
. However, that first conn_string_args
arg would include credentials, either from the generate_inputs()
task or as a Secret
+ other conn inputs from generate_inputs()
Adam Shamlian
05/20/2021, 3:38 PMKevin Kho
Adam Shamlian
05/20/2021, 3:41 PMKevin Kho
Adam Shamlian
05/20/2021, 3:43 PMSecret
instance? (i think that this is what you're getting back to me on in a bit, now that i type this out - no rush)Kevin Kho
Kevin Kho
Kevin Kho
Adam Shamlian
05/20/2021, 3:59 PMwith Flow('test') as f:
secret = PrefectSecret("MY_SECRET") # or a custom override of SecretBase
res = do_work(secret)
and this means that the plaintext only lives within the task itself, regardless of environment/Executor?
@task
def do_work_secret_inside():
secret_text = PrefectSecret("MY_SECRET").get()
return do_work(secret_text) # not a task, but equivalent to do_work in the flow above
with Flow('test2') as f:
res = do_work_secret_inside()
Kevin Kho
Adam Shamlian
05/20/2021, 4:06 PMPrefectSecret("MY_SECRET").run()
(which is what you said initially, lol - sorry, I just wanted to really make sure I understood the implications here)Kevin Kho
Adam Shamlian
05/20/2021, 4:09 PM