Hey there everyone. Curious if it is possible to ...
# ask-community
b
Hey there everyone. Curious if it is possible to map over a section of task dependencies. Like I have an E->L set of tasks that i want to map over a list of tables. Is that possible. It seems with map and apply_map it will still map over all the E's and then move on to mapping all the L's. I support this would be like mapping over a nested flow, but not a flow that is actually registered, but resides inside a registered flow. Was testing with this
Copy code
from time import sleep

from prefect import Flow, apply_map, task
from prefect.executors import DaskExecutor


@task()
def test_1(x):
    sleep(2)
    print(f"test_1 with {x}")
    return x


@task()
def test_2(x):
    sleep(2)
    print(f"test_2 with {x}")


def micro_flow(x):
    test_1_task = test_1(x)
    test_2(test_1_task)


with Flow(
    "example",
    executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 2}),
) as flow:
    apply_map(micro_flow, range(10))


if __name__ == "__main__":
    flow.run()
    # flow.visualize()
j
Hey Braun - do you mind adding a little more detail about your objective and how it differs from your example? Typically
apply_map
is used to help with conditions and branching within a map; it looks like in your example you end up with the same flow as if you just mapped explicitly.
b
sure....
so basically I have a 2+ step process that I want to map across a list of tables like EL or ETL or something else....x leads to y then lead to z
x>y>z
that is the dag. So if I were to use map, it seems with mapping as currently used, if I passed in a list of tables
[a,b,c,d]
it would run
x
for
[a,b,c,d]
, but when
x
for
a
was done it would not move on to
y
for
x
. I guess I am looking to map a sub dag of
x>y>z
across a list of
[a,b,c,d]
this could certain be a single taks....but just curious to see if this functionality was possible
j
Ok got it — the good news is Prefect will already do what you want, we call this “depth-first execution,” where we go on to the next mapped task before every element of the first mapped task is complete. However, it requires you to set
n_workers > 1
. I admit that surprised me, I naively thought
threads_per_worker
was enough, but perhaps one of our Dask experts can give more clarity. Here is a slightly modified version of your flow to illustrate this more obviously. I changed the sleep times to be variable and also had the first task print its status in green and the second in blue; if you see the colors interleaved it means depth-first execution is happening:
Copy code
import click
from time import sleep
from prefect import Flow, apply_map, task
from prefect.executors import DaskExecutor


@task()
def test_1(x):
    sleep(x)
    click.secho(f"test_1 with {x}", fg='green')
    return x


@task()
def test_2(x):
    sleep(x)
    click.secho(f"test_2 with {x}", fg='blue')


def micro_flow(x):
    test_1_task = test_1(x)
    test_2(test_1_task)


with Flow(
    "example",
    executor=DaskExecutor(cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}),
) as flow:
    apply_map(micro_flow, range(4))


if __name__ == "__main__":
    flow.run()
    # flow.visualize()
Is that what you were looking for?
(note DFE has some randomness so you might not see it every time you run this)
b
got it....so its not like this guaranteed
Copy code
worker 1 runs...x(a)->y(a)
worker 2 runs...x(b)->y(b)
next available worker runs...x(c)->y(c)
j
Not guaranteed as it still depends on dask for scheduling, but the scheduler could definitely decide that’s optimal for data locality
Dask doesn’t have a native equivalent of Prefect dynamic multi-level maps (where the map width isn’t known until runtime), so we’re repackaging the map and submitting each item for scheduling via the Dask scheduler