hey everyone, beginner question: i have a flow tha...
# ask-community
c
hey everyone, beginner question: i have a flow that looks like this:
Copy code
with Flow('fiege_po_uploads', schedule=cron_schedule) as fiege_po_uploads:
    po_nos_query = build_po_nos_query.map(brand=brands)
    po_nos = snowflake_queryfetch.map(
        query=po_nos_query, upstream_tasks=[po_nos_query])
    po_query = build_po_data_query.map(
        po_no=po_nos, brand=brands, upstream_tasks=[po_nos])
brands is a list of 2 strings. in the third task I'm trying to use 2 mapped arguments and i can kinda understand that this doesn't really work. however what is the proper way to do something like this? i'm a complete beginner to both prefect and anything functional, so i probably have a conceptual misunderstanding here. essentially what i'm trying to do here is having an outside loop over
brands
, an inside loop over
po_nos
, and be able to use the value of
brands
in every inner iteration.
k
is the list of brands something you want to parameterize? did you consider just making a list of queries and then mapping that?
Have you also taken a look at the
Flatten
part of the docs? This might help think through this https://docs.prefect.io/core/concepts/mapping.html#flat-mapping
c
the list of queries approach doesn't really work in my case cause i have a few more tasks further down in the flow that also want to use
brand
. i ended up just making a copy of the flow and hardcoding the brand in both, but that doesn't feel right 😅
i did read the flatten part, but didn't see how that could help in my case, let me check again
k
would you be able to show the snowflake_queryfetch function? just be sure to take out credentials (if they are there)
c
Copy code
snowflake_queryfetch = SnowflakeExecuteFetch(
    account='asd',
    user='prefect',
    password=Secret("PREFECT_SNOWFLAKE_PW").get(),
    role='prefect',
    warehouse='prefect'
)
SnowflakeExecuteFetch
is a modified version of the built in snowflake tasks, just swapped a few lines to execute and fetchall
k
Got it. Will digest this and get back to you
c
thanks 🙂 time for weekend for me anyways xD
🎉 1
k
from typing import Tuple, List from prefect import Flow, task @task def func_one(x): return f”func_one {x}” @task def func_two(y): return f”func_two {y}” @task def func_three(other_string, x): print(f”{x}, {other_string}“) return f”{x}, {other_string}” @task def cross_prod(x_list, y_list) -> Tuple[List, List]: test = [(x, y) for x in x_list for y in y_list] return [x[0] for x in test], [x[1] for x in test] with Flow(“test”) as flow: brands = [“a”, “b”, “c”] po_nos_query = func_one.map(x=brands) po_nos = func_two.map(y=po_nos_query, upstream_tasks=[po_nos_query]) new_list, new_list2 = cross_prod(po_nos, brands) po_query = func_three.map(other_string=new_list, x=new_list2) flow.run()
what is your take on something like this? I tried to find a way for you to not have to edit your tasks
c
hmm, i think this would work if the values of
po_nos
is the same for every value of brand, which in my case is unfortunately not the case. e.g. on any given run i might get
[1, 2]
for brand
a
and
[4, 6, 8]
for brand
b
.
i'd be fine with editing the tasks. i feel like i'm not structuring this the right way afterall. i was running into a similar issue a few days before and what i did there was just passing the value along through my tasks. e.g. in the above scenario
brand
would always be returned by every task along with the actual result of the task. that worked, but also didn't really feel right.
one other thing i found was the prefect Parameters, but from the docs i didn't really see how i could apply this to my case. essentially all i want to do is make my flow run multiple times, once for each value of
brand
. e.g. if there was a way to register the flow once per value of
brand
, that would also work. didn't really find anything about that though.
k
Yeah I think this sounds like it would need some restructuring unfortunately. I don’t think Parameters is the answer. It’s probably editing the Python functions. I’ll reply here if I have an idea.
🙇 1