Hi <@ULVA73B9P>, can I have a Dask flow calling a ...
# marvin-ai
r
Hi @Marvin, can I have a Dask flow calling a ConcurrentTask subflow and a SequentialTask subflow?
m
Yes, you can have a Dask flow that calls both
ConcurrentTask
and
SequentialTask
subflows in Prefect 3.x. Here's a concise explanation: 1. Dask Flow: Prefect supports Dask as a task runner, allowing you to parallelize task execution across a cluster. You can set up a Dask flow by using the
flow
decorator from Prefect. 2. ConcurrentTask and SequentialTask: These concepts aren't explicitly defined in the Prefect documentation, but you can achieve concurrent execution using Dask's parallel capabilities, and sequential execution by managing task dependencies. Here's a basic outline of how you might structure such a flow:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def concurrent_task():
    # Task logic here
    pass

@task
def sequential_task():
    # Task logic here
    pass

@flow(task_runner=DaskTaskRunner)
def my_dask_flow():
    # Concurrent execution
    concurrent_task.map()  # Assuming this is a placeholder for actual concurrent logic

    # Sequential execution
    sequential_task()

my_dask_flow()
### Useful Links - Prefect Dask Integration - Prefect Task and Flow API This structure allows you to use Dask for parallel execution while manually controlling task order for sequential execution. If you have specific patterns for
ConcurrentTask
and
SequentialTask
, you might need to adapt the flow logic accordingly.
r
I mean a ConcurrentTaskRunner and a SequentialTaskRunner within a DaskTaskRunner
r
oh oops, meant to ask Marvin. Thanks Nate! But I have the following design in mind: Parent_flow(task_runner=DaskTaskRunner()): child_flow.submit(param=a) child_flow.submit(param=b) child_flow(task_runner=ConcurrentTaskRunner()): res_a = task_a.submit() res_b = task_b.submit() The above code fails because flows don’t have a .submit() method. I basically want to run 2 Concurrent flows in one Dask cluster, so the 2 Concurrent flows can run simultaneously.
I’ll ask marvin too
@Marvin oh oops, meant to ask Marvin. Thanks Nate! But I have the following design in mind: Parent_flow(task_runner=DaskTaskRunner()): child_flow.submit(param=a) child_flow.submit(param=b) child_flow(task_runner=ConcurrentTaskRunner()): res_a = task_a.submit() res_b = task_b.submit() The above code fails because flows don’t have a .submit() method. I basically want to run 2 Concurrent flows in one Dask cluster, so the 2 Concurrent flows can run simultaneously.
m
In Prefect 3.x, flows do not have a
.submit()
method. Instead, you can use the
run
method to execute a flow. To achieve your design of running two concurrent flows within a Dask cluster, you can structure your code like this: 1. Use the
DaskTaskRunner
to run the parent flow, which will manage the execution of child flows concurrently. 2. Use
ConcurrentTaskRunner
within each child flow to manage the parallel execution of tasks within that flow. Here's how you can implement this:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ConcurrentTaskRunner

@task
def task_a():
    # Task logic for task_a
    return "Result A"

@task
def task_b():
    # Task logic for task_b
    return "Result B"

@flow(task_runner=ConcurrentTaskRunner())
def child_flow(param):
    res_a = task_a.submit()
    res_b = task_b.submit()
    return res_a, res_b

@flow(task_runner=DaskTaskRunner())
def parent_flow():
    # Run child flows concurrently
    child_flow(param="a")
    child_flow(param="b")

# Execute the parent flow
parent_flow()
### Key Points: -
DaskTaskRunner
: Manages the execution of the
parent_flow
on a Dask cluster. -
ConcurrentTaskRunner
: Used within
child_flow
to run tasks
task_a
and
task_b
concurrently. - Flow Execution: Instead of
.submit()
, use the flow name directly to execute it, as shown in
parent_flow
. ### Useful Links - Prefect Task Runners - Prefect Dask Integration This setup will allow each
child_flow
to run its tasks concurrently, and both child flows will run simultaneously in the Dask cluster.
r
@Marvin How does the parent_flow() know to run the child_flows() concurrently when we’re not using submit?
m
In Prefect 3.x, the concurrency in the
parent_flow
is managed by the task runner specified for the flow, in this case,
DaskTaskRunner
. When you call
child_flow(param="a")
and
child_flow(param="b")
within
parent_flow
, these calls are scheduled by the
DaskTaskRunner
to run concurrently. ### How It Works: - Task Runner: The
DaskTaskRunner
is responsible for managing the execution of tasks and flows. It can execute multiple tasks or subflows concurrently if there are no explicit dependencies between them. - Flow Calls: When you call a subflow within a parent flow, the task runner treats these calls as tasks. Since
DaskTaskRunner
is designed to handle parallel execution, it will execute these subflows concurrently. Here's the revised example with comments:
Copy code
python
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ConcurrentTaskRunner

@task
def task_a():
    return "Result A"

@task
def task_b():
    return "Result B"

@flow(task_runner=ConcurrentTaskRunner())
def child_flow(param):
    res_a = task_a.submit()
    res_b = task_b.submit()
    return res_a, res_b

@flow(task_runner=DaskTaskRunner())
def parent_flow():
    # These child flow calls are managed by DaskTaskRunner
    # and will run concurrently if there are no dependencies.
    child_flow(param="a")
    child_flow(param="b")

# Execute the parent flow
parent_flow()
### Key Points: - Concurrency Management: The
DaskTaskRunner
handles the scheduling and execution of tasks and subflows concurrently. - No Explicit
.submit()
: You don't need to use
.submit()
for flows; calling the flow function is sufficient when using task runners that support concurrency. This design leverages Prefect's ability to manage task concurrency through its task runners, allowing you to focus on the flow logic without dealing with the intricacies of task scheduling.
n
🤨 those seem like bad links, lemme check what's going on - marvin might be flying blind right now
flows are not subject to task runners, only tasks are. if you wanna run flows concurrently, you can do it like you would any python function, ie • use a ThreadPoolExecutor • asyncio gather • etc
r
I see. I just tried Marvin’s solution, and i can confirm that the child_flows() are NOT running in parallel like how Marvin said they would.
@Nate it’ll be nice if what I mentioned is a legitimate pattern 🙂
n
right bc those are sequential calls to a synchronous function
we try to avoid implicit diversions from normal python, so we'd likely not introduce that this is a good reason to use
.submit
on tasks (which you're free to nest)
r
It just seems like such a nice pattern within the Prefect ecosystem. IMO it will look odd to have Prefect-style concurrent/Dask code, but have the parent_flow() to fall back on normal python, which I’m going to spin up a local Dask cluster anyways.
but thx for your explaination!
n
can you explain what pattern you're looking for exactly? like this? where thing 1 and 2 are sync tasks/flows and calling them like this makes them run concurrently?
Copy code
thing_1()
thing_2()
r
Let’s call them flows. flow_1() using ConcurrentTaskRunner() and it runs 3 tasks concurrently. flow_2() is the same thing. Now I want to use Dask to run flow_1() and flow_2() in parallel. It’s my understanding that ConcurrentTaskRunner() splits tasks into threads, whereas Dask splits tasks into separate processes.
in realitly, flow_1() is called twice with different parameters, so it’s more like concurrent_flow_1(a) concurrent_flow_1(b) And I want to use dask to run those concurrent flows in paralle.
n
Copy code
import time

from prefect_dask import DaskTaskRunner

from prefect import flow, task


@task
def process_item(item: str) -> str:
    time.sleep(2)
    return f"Completed {item}"


@flow(task_runner=DaskTaskRunner())
def run_many_things_with_dask(things: list[str]) -> list[str]:
    return process_item.map(things).result()


@flow
def parent_flow():
    return (
        task(run_many_things_with_dask)
        .map(things=[["thing-a", "thing-b"], ["thing-c", "thing-d"]])
        .result()
    )


if __name__ == "__main__":
    start = time.time()
    results = parent_flow()
    end = time.time()

    print(f"\nResults: {results}")
    print(f"Total time: {end - start:.2f} seconds")
since you're free to nest, I think the easiest way to just wrap the flow in a task. maybe there's a future where you don't need a flow in order to select a task runner though
r
Interesting… so you wrap a Dask flow into a task, which the flow itself has a task that is called twice with map. I wonder if in this example, two separate Dask clusters are created, each with its own threads/workers to run the process_item() concurrently
n
it'd depend on the kwargs you pass to the DaskTaskRunner right?
in this case yeah i think it did create 2
but you can also connect to an existing cluster
r
does the .map() in the parent_flow() create a new thread to run the two run_many_things_with_dask()? What exactly is that .map() doing?
I’m not too familiar with this syntax: task(run_many_things_with_dask).map(…)
n
.map is just .submit for a whole iterable, so its using the default task runner (ThreadPoolExecutor) to run each task in concurrently in a thread > I’m not too familiar with this syntax: > task(run_many_things_with_dask).map(…) sorry this is just a decorator inline, you could just as well say
Copy code
@task
@flow
def foo()
but I don't like that
for example
Copy code
» python -c "from prefect import flow; flow(lambda: 42)()"
16:58:28.189 | INFO    | Flow run 'honest-heron' - Beginning flow run 'honest-heron' for flow 'unknown-lambda'
16:58:28.191 | INFO    | Flow run 'honest-heron' - View at <http://localhost:4200/runs/flow-run/e7a6765c-218f-4c78-9318-4d19dae24c6b>
16:58:28.203 | INFO    | Flow run 'honest-heron' - Finished in state Completed()
r
ohhh icicic.
One quick follow up, what prefect version are you running?
n
i was running on prefect's main branch but that example above should work for any late 2.x or 3.x version
r
I’m getting:
Copy code
File "/home/smds/share_drive/ross.leung/sandbox/prefect_sandbox/dask_test_advanced.py", line 22, in parent_flow
    task(run_many_things_with_dask)
AttributeError: 'list' object has no attribute 'result'
n
what version?
r
It’s ok though, I think it’s just my old python 3.8 env that’s causing issue. I’m on prefect 2.16.9
👍 1
thx again!
catjam 1
I’ll refactor my pipeline pattern to play more nicely
n