https://prefect.io logo
k

Kevin Mullins

02/11/2022, 4:55 PM
I’m working on a flow that will fan-out/fan-in in a dynamic manner. For each path, there are linear steps that are chained together. I’m taking advantage of
map
and have been successful with it previously for things like
create_flow_run
and `wait_for_flow_run`; however, the
wait_for_flow_run
input arguments exactly match the output from
create_flow_run
. I’m trying to think of how tasks can be chained together where the tasks take mapped arguments from upstream but it’s not included in the direct parent’s output. For instance, for several tasks I to prepare/process/finalize something, I need the same configuration information for multiple thats was a result of a
discover
task; however, this configuration information is not returned all the way through. I’m curious what would be a good approach to something like this. If I have an original fan-out that say returns a list of 5 results that get mapped and tasks keep returning different lists of 5 results is it safe to pass these results downstream and match them up by index order?
Copy code
# pseudo code
five_config_results = discover()

five_state_results = prepare.map(five_config_results)

five_process_results = process.map(
    five_state_results, config_results=five_config_results
)

five_finalize_results = finalize.map(
    five_process_results,
    five_state_results=five_state_results,
    config_results=five_config_results,
)
Or would another approach be needed to capture the matching results from each task to give to others?
Visual help:
Ah, may have answered it for myself. This may be a good use-case for
apply_map
. Going to prove it out and see: https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines
k

Kevin Kho

02/11/2022, 5:55 PM
I think you can do:
Copy code
with Flow(...) as flow:
    a = task_one.map(..)
    b = task_two.map(.., upstream_tasks=[a])
    c = task_three.map(.., upstream_tasks=[b])
and it will be a1 -> b1 -> c1. You need the lengths to match all the way and this will work. If you don’t have even lengths, then you can’t really do depth first execution and need to use
unmapped()
Copy code
with Flow(...) as flow:
    a = task_one.map(..)
    b = task_two.map(.., upstream_tasks=[unmapped(a)])
    c = task_three.map(.., upstream_tasks=[unmapped(b)])
does that answer you? But apply map is also a good solution
apply_map basically is a function that adds tasks to the Flow
k

Kevin Mullins

02/11/2022, 7:56 PM
The main reason for thinking apply_map would be needed is to solve how to pass both of the results of say task A and B as arguments to C and D.
k

Kevin Kho

02/11/2022, 7:59 PM
if
a
is a list and
b
is a list, you can do:
Copy code
with Flow(...) as flow:
    a = task_one.map(..)
    b = task_two.map(..)
    c = task_three.map(a=a, b=b)
and c will map over task_one outputs and task_two outputs
As long as a and b are the same length
There is an example here
k

Kevin Mullins

02/11/2022, 8:02 PM
That’s kind of what I was originally thinking (probably didn’t explain well). I wasn’t sure how the mapping worked like that. So if task_one returns 10
car
objects and task_two returns 10
keys
objects. would task_three be executed 10 times given the matching sets of cars and keys (assuming they are returned in the same order)?
Seems to work, here is a little POC I put together. Just want to make sure that this is defined behavior to count on it:
Copy code
from typing import List

import prefect
from prefect import Flow, task

car_keys = {
    "Honda": "Gold",
    "Toyota": "Silver",
    "Ford": "Blue",
    "Chevy": "Green",
    "Ferrari": "Red",
}


@task
def get_cars() -> List[str]:
    cars = [k for k in car_keys.keys()]
    return cars


@task
def get_key_for_car(car) -> str:
    key = car_keys[car]
    return key


@task
def start_car(car, key) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"starting car '{car}' with key '{key}'")


with Flow("multi_map_args") as flow:
    cars = get_cars()
    keys = get_key_for_car.map(cars)
    start_car.map(car=cars, key=keys)
k

Kevin Kho

02/11/2022, 8:26 PM
Yes exactly
k

Kevin Mullins

02/11/2022, 8:28 PM
This was the alternative form I thought of with `apply_map`:
Copy code
from typing import List

import prefect
from prefect import Flow, task, apply_map

car_keys = {
    "Honda": "Gold",
    "Toyota": "Silver",
    "Ford": "Blue",
    "Chevy": "Green",
    "Ferrari": "Red",
}


@task
def get_cars() -> List[str]:
    cars = [k for k in car_keys.keys()]
    return cars


@task
def get_key_for_car(car) -> str:
    key = car_keys[car]
    return key

@task
def start_car(car, key) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"starting car '{car}' with key '{key}'")

def get_key_and_start_car(car: str) -> None:
    key = get_key_for_car(car)
    start_car(car, key)

with Flow("multi_map_args_apply") as flow:
    cars = get_cars()
    apply_map(get_key_and_start_car, cars)
k

Kevin Kho

02/11/2022, 8:30 PM
I would avoid apply map if possible personally. There are edge cases where it can get finicky from what I have seen
👍 1
k

Kevin Mullins

02/11/2022, 8:30 PM
Glad I talked it out. I’ll go with the first route. Thank you very much!
3 Views