Hi all. I have a set of inputs that I am going to ...
# ask-community
a
Hi all. I have a set of inputs that I am going to map to a series of tasks that dynamically touch databases. Consequently, I don't want to directly pass database connections (to avoid any nonsense in a distributed environment). If I have:
Copy code
@task
def generate_inputs_from_params(args):
    #....

@task
def create_db_conn(args):
    # ....

@task 
def do_db_work(args):
    # ....

@task
def do_some_other_work(args):
    # ...

with Flow("example") as f:
    # Parameter tasks
    conn_inputs, db_work_inputs, other_work_inputs = generate_inputs_from_params(args) # from param tasks
    conn_map = create_db_conn.map(conn_inputs)
    res_map = do_db_work.map(conn_map, db_work_inputs)
    res2_map = do_some_other_work(res_map, other_work_inputs)
    # some reduce func if necessary
I have two questions about this: 1. Is that flow constructed properly - I'm ultimately after something like:
Copy code
inputs = generate_inputs_from_params(args) # from param tasks
for (conn_input, db_work_input, other_work_input) in inputs:
    conn = create_db_conn(conn_input)
    res = do_db_work(conn, db_work_input)
    res2 = do_some_other_work(res2, other_work_input)
2. When mapping over credentials dynamically, would I inject `Secret`s into the
conn_inputs
or would I resolve the proper
Secret
"within the for loop" (i.e. extending the map chain to include an additional layer that resolves `Secret`s)? My understanding of docs is that if I do the former, the secret data would be exposed in
conn_inputs
, which in a distributed environment means that plaintext credentials could be making network hops, or in any environment would be persisted as part of
Result
instances. I'd like to make sure I'm understanding this correctly.
k
Hi @Adam Shamlian! On the first question, I am not 100% sure will work because
conn_map = create_db_conn.map(conn_inputs)
looks like it’s returning connections? When using Dask for parallelization, task outputs need to be serializable by cloudpickle. If you combine them into one task that creates the connection and does the owrk, it might work.
a
yeah, I assumed that the scheduler is smart enough to keep the elements of the for-loop together on workers, in which case I think it would work as is, but you are right to raise this point. I will lump the work + conn together - best not to rely on what would be an implementation detail at best. good catch.
k
I think the issue with the first is that the serialization happens during build time, which will fail. I understand what you’re saying with the second question. You want to pass connections so you don’t need to pass secrets right?
Maybe you can load it in the task instead? Like
PrefectSecret("MYSECRET").rum()
a
Going off the above, I won't pass conns directly.
create_db_conn()
and `do_db_work()`would become a single
do_db_work(conn_string_args, other_args)
. However, that first
conn_string_args
arg would include credentials, either from the
generate_inputs()
task or as a
Secret
+ other conn inputs from
generate_inputs()
is loading a `Secret`within a task considered to be idiomatic? my understanding is that tasks of tasks is not really The Way
k
Actually, let me confirm if that Secret is being passed in plain text but tasks can be used with tasks in general (yes a bit ugly and maybe always the way). Will get back to you in a bit.
a
ok, quick question about understanding Secrets in general. they are serialized as instructions to load the secret, and only fully resolved when the flow (or task) is actually run, correct?
k
That one is yes. All tasks are deferred execution and they are a special task.
a
so if a Secret is in a task, or part of a flow, the only way it would "leak" its secret would be if you construct a task that consumes the secret data in its final form rather than actually consume a
Secret
instance? (i think that this is what you're getting back to me on in a bit, now that i type this out - no rush)
k
Yep that’s what I’m double checking.
👍 1
Ok so the secret is passed in plain text if not done inside a task, unless you have some special Dask configuration. Using it inside a task reduces the surface area of where it’s exposed, but in general execution happens on the environment of our users so it’s not something we give opinions on. We leave security to our users. Hope that gives enough information for you though.
a
so this means that the Secret is resolved to plaintext, potentially (but not necessarily - just a quirk of environments and Executor config) anywhere
Copy code
with Flow('test') as f:
    secret = PrefectSecret("MY_SECRET") # or a custom override of SecretBase
    res = do_work(secret)
and this means that the plaintext only lives within the task itself, regardless of environment/Executor?
Copy code
@task
def do_work_secret_inside():
    secret_text = PrefectSecret("MY_SECRET").get()
    return do_work(secret_text) # not a task, but equivalent to do_work in the flow above

with Flow('test2') as f:
    res = do_work_secret_inside()
k
It does get resolved to plain text in general during runtime and users need to be careful about how they use it (like just don’t print and log it). Yes in the second one it’s loaded inside that task so if it executes on a worker already, at least the surface area is reduced as long as it’s not being returned.
a
got it, so to sum up, `Secret`s are really just to prevent storage of `Result`s - they don't really have any security implications for the actual resolution of their data with respect to how they are treated by Executors/schedulers. and the final takeaway here is that for guaranteed minimal exposure, I should treat them like I would db_conns and just directly create and resolve them in the task(s) that consume them with
PrefectSecret("MY_SECRET").run()
(which is what you said initially, lol - sorry, I just wanted to really make sure I understood the implications here)
k
Yes to everything unless you like explicitly save it, which would be bad practice.
a
lol yep. ok, thanks again for all the help