Christian Eik

    Christian Eik

    1 year ago
    hey everyone, beginner question: i have a flow that looks like this:
    with Flow('fiege_po_uploads', schedule=cron_schedule) as fiege_po_uploads:
        po_nos_query = build_po_nos_query.map(brand=brands)
        po_nos = snowflake_queryfetch.map(
            query=po_nos_query, upstream_tasks=[po_nos_query])
        po_query = build_po_data_query.map(
            po_no=po_nos, brand=brands, upstream_tasks=[po_nos])
    brands is a list of 2 strings. in the third task I'm trying to use 2 mapped arguments and i can kinda understand that this doesn't really work. however what is the proper way to do something like this? i'm a complete beginner to both prefect and anything functional, so i probably have a conceptual misunderstanding here. essentially what i'm trying to do here is having an outside loop over
    brands
    , an inside loop over
    po_nos
    , and be able to use the value of
    brands
    in every inner iteration.
    Kevin Kho

    Kevin Kho

    1 year ago
    is the list of brands something you want to parameterize? did you consider just making a list of queries and then mapping that?
    Have you also taken a look at the
    Flatten
    part of the docs? This might help think through this https://docs.prefect.io/core/concepts/mapping.html#flat-mapping
    Christian Eik

    Christian Eik

    1 year ago
    the list of queries approach doesn't really work in my case cause i have a few more tasks further down in the flow that also want to use
    brand
    . i ended up just making a copy of the flow and hardcoding the brand in both, but that doesn't feel right 😅
    i did read the flatten part, but didn't see how that could help in my case, let me check again
    Kevin Kho

    Kevin Kho

    1 year ago
    would you be able to show the snowflake_queryfetch function? just be sure to take out credentials (if they are there)
    Christian Eik

    Christian Eik

    1 year ago
    snowflake_queryfetch = SnowflakeExecuteFetch(
        account='asd',
        user='prefect',
        password=Secret("PREFECT_SNOWFLAKE_PW").get(),
        role='prefect',
        warehouse='prefect'
    )
    SnowflakeExecuteFetch
    is a modified version of the built in snowflake tasks, just swapped a few lines to execute and fetchall
    Kevin Kho

    Kevin Kho

    1 year ago
    Got it. Will digest this and get back to you
    Christian Eik

    Christian Eik

    1 year ago
    thanks 🙂 time for weekend for me anyways xD
    Kevin Kho

    Kevin Kho

    1 year ago
    from typing import Tuple, List from prefect import Flow, task @task def func_one(x): return f”func_one {x}” @task def func_two(y): return f”func_two {y}” @task def func_three(other_string, x): print(f”{x}, {other_string}“) return f”{x}, {other_string}” @task def cross_prod(x_list, y_list) -> Tuple[List, List]: test = [(x, y) for x in x_list for y in y_list] return [x[0] for x in test], [x[1] for x in test] with Flow(“test”) as flow: brands = [“a”, “b”, “c”] po_nos_query = func_one.map(x=brands) po_nos = func_two.map(y=po_nos_query, upstream_tasks=[po_nos_query]) new_list, new_list2 = cross_prod(po_nos, brands) po_query = func_three.map(other_string=new_list, x=new_list2) flow.run()
    what is your take on something like this? I tried to find a way for you to not have to edit your tasks
    Christian Eik

    Christian Eik

    1 year ago
    hmm, i think this would work if the values of
    po_nos
    is the same for every value of brand, which in my case is unfortunately not the case. e.g. on any given run i might get
    [1, 2]
    for brand
    a
    and
    [4, 6, 8]
    for brand
    b
    .
    i'd be fine with editing the tasks. i feel like i'm not structuring this the right way afterall. i was running into a similar issue a few days before and what i did there was just passing the value along through my tasks. e.g. in the above scenario
    brand
    would always be returned by every task along with the actual result of the task. that worked, but also didn't really feel right.
    one other thing i found was the prefect Parameters, but from the docs i didn't really see how i could apply this to my case. essentially all i want to do is make my flow run multiple times, once for each value of
    brand
    . e.g. if there was a way to register the flow once per value of
    brand
    , that would also work. didn't really find anything about that though.
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah I think this sounds like it would need some restructuring unfortunately. I don’t think Parameters is the answer. It’s probably editing the Python functions. I’ll reply here if I have an idea.