Braun Reyes
02/23/2021, 1:32 PMfrom time import sleep
from prefect import Flow, apply_map, task
from prefect.executors import DaskExecutor
@task()
def test_1(x):
sleep(2)
print(f"test_1 with {x}")
return x
@task()
def test_2(x):
sleep(2)
print(f"test_2 with {x}")
def micro_flow(x):
test_1_task = test_1(x)
test_2(test_1_task)
with Flow(
"example",
executor=DaskExecutor(cluster_kwargs={"n_workers": 1, "threads_per_worker": 2}),
) as flow:
apply_map(micro_flow, range(10))
if __name__ == "__main__":
flow.run()
# flow.visualize()
Jeremiah
apply_map
is used to help with conditions and branching within a map; it looks like in your example you end up with the same flow as if you just mapped explicitly.Braun Reyes
02/23/2021, 3:01 PMBraun Reyes
02/23/2021, 3:08 PMx>y>z
that is the dag.
So if I were to use map, it seems with mapping as currently used, if I passed in a list of tables [a,b,c,d]
it would run x
for [a,b,c,d]
, but when x
for a
was done it would not move on to y
for x
. I guess I am looking to map a sub dag of x>y>z
across a list of [a,b,c,d]
Braun Reyes
02/23/2021, 3:38 PMJeremiah
n_workers > 1
. I admit that surprised me, I naively thought threads_per_worker
was enough, but perhaps one of our Dask experts can give more clarity.
Here is a slightly modified version of your flow to illustrate this more obviously. I changed the sleep times to be variable and also had the first task print its status in green and the second in blue; if you see the colors interleaved it means depth-first execution is happening:Jeremiah
import click
from time import sleep
from prefect import Flow, apply_map, task
from prefect.executors import DaskExecutor
@task()
def test_1(x):
sleep(x)
click.secho(f"test_1 with {x}", fg='green')
return x
@task()
def test_2(x):
sleep(x)
click.secho(f"test_2 with {x}", fg='blue')
def micro_flow(x):
test_1_task = test_1(x)
test_2(test_1_task)
with Flow(
"example",
executor=DaskExecutor(cluster_kwargs={"n_workers": 2, "threads_per_worker": 1}),
) as flow:
apply_map(micro_flow, range(4))
if __name__ == "__main__":
flow.run()
# flow.visualize()
Jeremiah
Jeremiah
Braun Reyes
02/23/2021, 4:37 PMworker 1 runs...x(a)->y(a)
worker 2 runs...x(b)->y(b)
next available worker runs...x(c)->y(c)
Jeremiah
Jeremiah