YSF
09/09/2022, 1:06 AMflow = Flow("My imperative flow!")
# define some new tasks
name = Parameter("name")
second_add = add.copy()
# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)
# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)
# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
say_hello.bind(person=name, flow=flow)
If I wanted to give the add
task a mapped input?Jean Luciano
09/09/2022, 7:04 PMapply_map
function https://github.com/PrefectHQ/prefect/blob/929239eda6235fb91173ce63f292551c47b23004/src/prefect/utilities/tasks.py#L28YSF
09/09/2022, 7:08 PMfrom prefect import task, case, apply_map
from prefect.tasks.control_flow import merge
@task
def inc(x):
return x + 1
@task
def is_even(x):
return x % 2 == 0
def inc_if_even(x):
with case(is_even(x), True):
x2 = inc(x)
return merge(x, x2)
with Flow("example") as flow:
apply_map(inc_if_even, range(10))
Is this a typo? Is the function inc_if_even
not a task?inc
a range of inputs? How would I do that and how would I do that with the imperative api? It seems like in the imperative api inputs are added via the .bind method?Jean Luciano
09/09/2022, 7:19 PMYSF
09/09/2022, 9:22 PMtask.map
? So would that mean I would do task.bind()
for non mapped tasks and task.map()
for mapped tasks? I apologize I'm just new to this concept, can you show me in this example code how you would map a task:
flow = Flow("My imperative flow!")
# define some new tasks
name = Parameter("name")
second_add = add.copy()
# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)
# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)
# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
say_hello.bind(person=name, flow=flow)