Kevin Mullins
02/11/2022, 4:55 PMmap
and have been successful with it previously for things like create_flow_run
and `wait_for_flow_run`; however, the wait_for_flow_run
input arguments exactly match the output from create_flow_run
. I’m trying to think of how tasks can be chained together where the tasks take mapped arguments from upstream but it’s not included in the direct parent’s output. For instance, for several tasks I to prepare/process/finalize something, I need the same configuration information for multiple thats was a result of a discover
task; however, this configuration information is not returned all the way through.
I’m curious what would be a good approach to something like this. If I have an original fan-out that say returns a list of 5 results that get mapped and tasks keep returning different lists of 5 results is it safe to pass these results downstream and match them up by index order?
# pseudo code
five_config_results = discover()
five_state_results = prepare.map(five_config_results)
five_process_results = process.map(
five_state_results, config_results=five_config_results
)
five_finalize_results = finalize.map(
five_process_results,
five_state_results=five_state_results,
config_results=five_config_results,
)
Or would another approach be needed to capture the matching results from each task to give to others?apply_map
. Going to prove it out and see:
https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelinesKevin Kho
with Flow(...) as flow:
a = task_one.map(..)
b = task_two.map(.., upstream_tasks=[a])
c = task_three.map(.., upstream_tasks=[b])
and it will be a1 -> b1 -> c1. You need the lengths to match all the way and this will work. If you don’t have even lengths, then you can’t really do depth first execution and need to use unmapped()
with Flow(...) as flow:
a = task_one.map(..)
b = task_two.map(.., upstream_tasks=[unmapped(a)])
c = task_three.map(.., upstream_tasks=[unmapped(b)])
does that answer you?
But apply map is also a good solutionKevin Mullins
02/11/2022, 7:56 PMKevin Kho
a
is a list and b
is a list, you can do:
with Flow(...) as flow:
a = task_one.map(..)
b = task_two.map(..)
c = task_three.map(a=a, b=b)
and c will map over task_one outputs and task_two outputsKevin Mullins
02/11/2022, 8:02 PMcar
objects and task_two returns 10 keys
objects. would task_three be executed 10 times given the matching sets of cars and keys (assuming they are returned in the same order)?from typing import List
import prefect
from prefect import Flow, task
car_keys = {
"Honda": "Gold",
"Toyota": "Silver",
"Ford": "Blue",
"Chevy": "Green",
"Ferrari": "Red",
}
@task
def get_cars() -> List[str]:
cars = [k for k in car_keys.keys()]
return cars
@task
def get_key_for_car(car) -> str:
key = car_keys[car]
return key
@task
def start_car(car, key) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"starting car '{car}' with key '{key}'")
with Flow("multi_map_args") as flow:
cars = get_cars()
keys = get_key_for_car.map(cars)
start_car.map(car=cars, key=keys)
Kevin Kho
Kevin Mullins
02/11/2022, 8:28 PMfrom typing import List
import prefect
from prefect import Flow, task, apply_map
car_keys = {
"Honda": "Gold",
"Toyota": "Silver",
"Ford": "Blue",
"Chevy": "Green",
"Ferrari": "Red",
}
@task
def get_cars() -> List[str]:
cars = [k for k in car_keys.keys()]
return cars
@task
def get_key_for_car(car) -> str:
key = car_keys[car]
return key
@task
def start_car(car, key) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"starting car '{car}' with key '{key}'")
def get_key_and_start_car(car: str) -> None:
key = get_key_for_car(car)
start_car(car, key)
with Flow("multi_map_args_apply") as flow:
cars = get_cars()
apply_map(get_key_and_start_car, cars)
Kevin Kho
Kevin Mullins
02/11/2022, 8:30 PM