https://prefect.io logo
Title
y

YSF

09/09/2022, 1:06 AM
How do I add mapped tasks in the imperative API style? so for example:
flow = Flow("My imperative flow!")

# define some new tasks
name = Parameter("name")
second_add = add.copy()

# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)

# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)

# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
say_hello.bind(person=name, flow=flow)
If I wanted to give the
add
task a mapped input?
👀 1
y

YSF

09/09/2022, 7:08 PM
@Jean Luciano, I was looking at the example for this function:
from prefect import task, case, apply_map
    from prefect.tasks.control_flow import merge
    @task
    def inc(x):
        return x + 1
    @task
    def is_even(x):
        return x % 2 == 0
    def inc_if_even(x):
        with case(is_even(x), True):
            x2 = inc(x)
        return merge(x, x2)
    with Flow("example") as flow:
        apply_map(inc_if_even, range(10))
Is this a typo? Is the function
inc_if_even
not a task?
What if I wanted to give
inc
a range of inputs? How would I do that and how would I do that with the imperative api? It seems like in the imperative api inputs are added via the .bind method?
j

Jean Luciano

09/09/2022, 7:19 PM
I did a bit more research and think this is the way to do it in imperative api https://github.com/PrefectHQ/prefect/blob/929239eda6235fb91173ce63f292551c47b23004/src/prefect/core/task.py#L766
y

YSF

09/09/2022, 9:22 PM
Is that not the functional API?
task.map
? So would that mean I would do
task.bind()
for non mapped tasks and
task.map()
for mapped tasks? I apologize I'm just new to this concept, can you show me in this example code how you would map a task:
flow = Flow("My imperative flow!")

# define some new tasks
name = Parameter("name")
second_add = add.copy()

# add our tasks to the flow
flow.add_task(add)
flow.add_task(second_add)
flow.add_task(say_hello)

# create non-data dependencies so that `say_hello` waits for `second_add` to finish.
say_hello.set_upstream(second_add, flow=flow)

# create data bindings
add.bind(x=1, y=2, flow=flow)
second_add.bind(x=add, y=100, flow=flow)
say_hello.bind(person=name, flow=flow)