https://prefect.io logo
Title
f

Florian Guily

06/14/2022, 10:12 AM
Hey, i was wondering if it is possible to use a map inside an apply map
in prefect 1
a

Anna Geller

06/14/2022, 11:15 AM
apply_map is used for more complex mapped tasks, usually involving conditional branching but it's just a normal function that gets applied over some list of inputs and thus creates a task - in theory, this normal function could call map with .run() but I wouldn't recommend that, there is certainly a better way (btw you don't need any of that in Prefect 2.0 so it may be easier to migrate later if you keep things simple) can you explain what you are trying to do? talking about features without context is rarely helpful 😁
k

Kevin Kho

06/14/2022, 1:50 PM
This is not, I think that results in a two stage mapping, which you can’t do
f

Florian Guily

06/15/2022, 8:44 AM
this was mainly to mimic a nested for loops
i know prefect 2 is perfectly suited for this but this is on prefect 1 for now
the goal is to write records in base but verifications have to be done and some elements of each records have to be written in base before those records.
python code should look like this:
records = get_records()
for record in records:
    tags = []
    record_id = get_record_id_if_exist()
    if record_id == None:
        for tag in record["tags"]:
            tag_id = get_record_tag_if_exist()
            if tag_id == None:
                tag_id = create_tag()
            tags.append(tag_id)
        record_id = create_open_data(record['open_data'], tag_list)
    version_id = create_version(record_id, record['version'])
maybe i can do an apply map for the first for loop and to the second one sequentially ?
a

Anna Geller

06/15/2022, 12:42 PM
how much visibility do you need here? if it's enough to see this as one "node" in your workflow graph, you can totally put the entire logic here into a single task without mapping - it depends a lot on your observability needs
k

Kevin Kho

06/15/2022, 12:55 PM
You can create a task that creates all possible combinations and make it a one stage map
f

Florian Guily

06/15/2022, 1:07 PM
if possible i would like to have observability on all of the create functions as they have to be retried if they fail (the write in base)
k

Kevin Kho

06/15/2022, 1:10 PM
Ah let me rephrase. You can create a task that creates all possible combinations and then feed that to apply_map
f

Florian Guily

06/15/2022, 1:49 PM
what do you mean by "all possible combination" ?
k

Kevin Kho

06/15/2022, 2:04 PM
Let me make a sample here
Starting from the Prefect example on
apply_map
, I just add another input called
y
but it doesn’t do anything:
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x,y):
    return x + 1

@task
def negate(x,y):
    return -x

@task
def is_even(x,y):
    return x % 2 == 0

def inc_or_negate(x, y):
    cond = is_even(x,y)
    with case(cond, True):
        res1 = inc(x,y)
    with case(cond, False):
        res2 = negate(x,y)
    return merge(res1, res2)
I believe your goal is like this:
for x in [1,2,3,4]:
    for y in ["A","B","C","D"]:
        inc_or_negate(x, y)
So we can make another task `cross_product`:
@task(nout=2)
def cross_product(x_list, y_list):
    res = []
    for x in x_list:
        for y in y_list:
            res.append((x,y))
    res_x = [_[0] for _ in res]
    res_y = [_[1] for _ in res]
    return res_x, res_y
and then run the Flow:
with Flow("apply-map example") as flow:
    x, y = cross_product(range(4), ["A","B","C", "D"])
    result = apply_map(inc_or_negate, x, y)

flow.run()
So
x
and
y
already represent all the possible combinations
f

Florian Guily

06/15/2022, 2:15 PM
oooh ok i see
k

Kevin Kho

06/15/2022, 2:17 PM
Yeah it becomes a 1 stage mapping problem instead of 2
👍 1
f

Florian Guily

06/15/2022, 2:17 PM
i'll give it a try thanks !