https://prefect.io logo
Title
s

Shiyu Gan

03/23/2022, 3:24 AM
When DaskExecutor is used, does Prefect delegate DAG (of tasks) scheduling all to DaskScheduler?
k

Kevin Kho

03/23/2022, 3:27 AM
I think the answer here is not quite. Tasks are submitted immediately using
client.submit()
. The exception is mapping where if you have A -> B -> C, then A1 -> B1 -> C1 are submitted. The Prefect DAG still has features the Dask DAG doesn’t have. For example, Prefect is still responsible for stuff like caching across flow runs or triggers to determine whether or not to run a task.
Stuff like this happens even before submission to Dask
s

Shiyu Gan

03/23/2022, 3:32 AM
Consider a simple DAG: A -> B Who (Prefect or Dask) takes care of the dependency scheduling that ensure B only starts after A is done?
k

Kevin Kho

03/23/2022, 3:34 AM
Prefect
Ah well, for mapping like the example above it’s Dask. But if not mapping it’s Prefect
s

Shiyu Gan

03/23/2022, 3:39 AM
Tasks are submitted immediately using client.submit().
If tasks are submitted immediately, how is the "dependency wait" honoured?
k

Kevin Kho

03/23/2022, 3:41 AM
Because Prefect knows the DAG so it can traverse that DAG during run time and then submit the next task
s

Shiyu Gan

03/23/2022, 3:44 AM
So prefect "blocks" until A is complete, then schedules B?
k

Kevin Kho

03/23/2022, 3:45 AM
Yes because if A fails, or raises SKIP, then Prefect doesn’t want to execute B
s

Shiyu Gan

03/23/2022, 3:48 AM
Which line in
flow_runner.py
's
get_flow_run_state
is the "block" logic?
k

Kevin Kho

03/23/2022, 3:51 AM
It’s not that simple because there are many checks and the flow runner is pretty hard to read. I can try looking for it, but is there something you are trying to do?
s

Shiyu Gan

03/23/2022, 3:56 AM
Can you give me some pointers? I can go do the reading myself. Just trying to understand prefect implementation code.
Thanks! 🙏
k

Kevin Kho

03/23/2022, 3:59 AM
So you can start here in the FlowRunner, and then notice the tasks are sorted. I believe that’s the order of submission and then there is a bunch of logic for mapping, so it goes to be submitted here .
By that point, it’s already submitted to the DaskExecutor so you can see the DaskExecutor.run()
And then notice in the TaskRunner here , this is run recursively and checks for the conditions to run tasks, and then actually runs task in at the end after all the checks
So the answer to your question is there is a sort of the tasks in the Flow, and then the tasks are submission in order. The task runner recursively checks for conditions. The topological sort can be found here
Then to the first question, note this all happens with/without Dask. If using DaskExecutor,
executor.submit()
=
client.submit()
. If using LocalExecutor,
executor.submit() = regular function call
s

Shiyu Gan

03/23/2022, 4:49 AM
Can you elaborate on "TaskRunner recursively checks for conditions"?
k

Kevin Kho

03/23/2022, 4:54 AM
Look at all this
s

Shiyu Gan

03/23/2022, 5:25 AM
Ok, it seems recursive task run only comes into play if task loops
t

Tomer Cagan

03/23/2022, 7:11 AM
Cool thread - thanks for that!
s

Shiyu Gan

03/24/2022, 8:17 AM
@Kevin Kho Can you task runner only recursively runs a task if the task loops?
k

Kevin Kho

03/24/2022, 1:39 PM
I don’t think so right? Because a Failed state that goes to Retry will end up in that same loop I think. I’d need to dive in more.