Hey all, wanted to chat about DFE for a hot second...
# prefect-contributors
a
Hey all, wanted to chat about DFE for a hot second to get a better understanding of the Prefect engine and the decisions that went into the current design and things to watch out for in the future design. So in theory, I 100% understand the difference between the DFE execution model and the BFE execution model, but I’m wanting to know if I’m understanding how Prefect is currently handling BFE. I would also like to know, big picture, why the idea I’m having for DFE wouldn’t work. So current understanding of BFE is that when the FlowRunner goes to submit each task, they gets initialized and prepare to submit for running, it does a quick check where it sees if the its a parent of a mapped task (going to submit a bunch of tasks) or the child of that parent (one of the spawned mapped tasks). The FlowRunner looks to wait for the Task (or all the children of that Task) before continuing to walk the DAG. In the context of Prefect, this means that each Task (or parent of a group of Mapped Tasks) represents the computation, and the reason that the BFE works so “easily” is that you’re only ever going down 1 level of execution. In addition to this, the Task graph assumes that a task can have either the Task or the group of Mapped Tasks as a parent. So my understanding of how DFE would be executed would be to add a recursion option onto both the 1 level of execution (now can be 1 to N levels), and the parent where can it expect either a Task or 1 to N levels of mapped parents. But in actuality (I think) all that would matter is that it sees is the upstream dependency on M tasks, and would just need to wait until those are done. I’m trying to wrap my head around the change of semantics (with specific regards to what would need to change for Prefect). I think Chris mentioned on the github issue that task running logic would need to move to the FlowRunner. I would think that the only portion that needs to change on that would be how gathering futures works, and whether we needed to wait on the completion of the futures or just passing around the futures themselves to any potential downstream tasks. Things I’m not sure about: • When tasks are getting prepared to run, is the assumption that since the Task was told to run, it has the go ahead (outside of the light validation that gets done) • Would DFE execution require some form of weighted futures to stay performant (which sub-trees to prioritize?) • Should the determination of which upstream tasks are ready fall onto the TaskRunner of the downstream task? Or is that in the FlowRunner by a design choice?
upvote 1
marvin 1
👀 3
c
I appreciate that you’ve done your homework on this! Before I address your three questions let me clarify one important point: currently the
TaskRunner
is responsible for both submitting and waiting on the children tasks that are dynamically spawned. The reason this is so important is because the futures created here are more difficult to communicate back to the flow runner and historically have caused errors because we are gathering futures created with a different client (the errors here were possibly more on Prefect than on Dask, but still represent a complication)
for your questions: - Yes that is correct - Good question; this will have to be empirically tested to know for sure. The implementation that should go out this week will not perform an optimizations - Executors are the objects that are responsible for deciding when and where a task should run. So calling
executor.submit
does not mean that task will begin running immediately, only that it has been communicated to the executor. So when using the
LocalExecutor
that call is blocking until completion, but when using the
DaskExecutor
the call returns immediately. I mention this because neither of the runners have the role of knowing when a task is ready to run from the computational standpoint, they only determine whether a task should proceed with a run given the information about the upstream states (which the executor is responsible for ensuring are completed)
You hit the nail on the head with
all that would matter is that it sees is the upstream dependency on M tasks, and would just need to wait until those are done.
The new implementation will do just this --> within the flow runner, if a task is mapped, we will wait until its upstream result is ready and at that point it will know the width of the graph and can submit each child itself. This will allow the flow runner to submit all children and still move on submitting more work to the executor, which is the key to enabling DFE
👍 2
a
This is a super well thought out answer, thanks so much for the great context! Out of curiosity, if retrieving futures from Dask was reliable, would most of this problem disappear? It seems like that is a part of some of the redesign decision. If futures were reliable in that way, couldn’t any level of mapping be submitted then passed around as we’d hope?
👍 1
c
Almost. There’s an added wrinkle: let’s say we didn’t
wait
on the futures within the
TaskRunner
. From the
FlowRunner
perspective we would have:
Copy code
state = executor.submit(**kwargs)
and this
state
object is actually a dask future. The resolved future would be a
Mapped
state with more futures representing the children as an attribute (probably
mapped_states
). The catch is that downstream
submit
calls that the flow runner makes would need to know to wait not only for the resolved
Mapped
state but also all the children it submitted. That sort of futures-within-futures unpacking is possible but introduces loads more complexity
a
So would this mean we’d be operating in no man’s land while the future is resolving? Would this be a problem? Or can it just be assigned Running and call it a day? I assumed the future-within-future unpacking fun would come par with the course. Given your reaction to it, I’m assuming there’s a way to avoid that. The other option is doing the unpacking fun, but wouldn’t the limit to the complexity stop at the relationship between parent and child? Is there any other prefect complexity other than having to recursively walk a tree of children instead of one level of children?
c
The future-within-future is more of a Dask situation than a prefect one; Dask will see that the
Mapped
future is resolved and tell the downstreams “you’re good to go!” and they would begin executing, operating on Dask futures as their input data (coming from
mapped_states
) instead of appropriately resolved data — Dask would only know to resolve and wait for the “outer” level of futures and would need extra information to know that there are more futures within that future that are relevant
1 min let me show you a toy example
👍 2
This is very much a schematic of what’s happening behind the scenes, but this highlights the futures-within-futures issue:
Copy code
from dask import delayed

@delayed
def return_futures(n):
    return [delayed(x + 1) for x in range(n)]

@delayed
def sumit(upstream):
    return sum(upstream)

final = sumit(return_futures(10))
final.compute()
# Delayed('add-8e93f1daac7666a88c0b0de72667c460')
this doesn’t highlight any error messages but notice that the
sumit
function ran (evidenced by
final.compute()
actually returning) but it ran on a list of unresolved futures, not on a list of numbers like we wanted
a
Sounds good! Thanks for putting in the effort on this! Definitely learning a lot! I’d think since every prefect task executes via the run method, there could be un nesting occurring there, but it definitely seems better to avoid it if at all possible.
c
anytime! I’m super happy to help you understand how it all works more
actually if you update
sumit
to:
Copy code
@delayed
def sumit(upstream):
    print([type(x) for x in upstream])
    return sum(upstream)
you’ll see its operating on futures instead of numbers
a
Gonna do some dishes from dinner then play around with this to make sure I understand it! Will come back with more questions tomorrow!
c
yup no worries!
if you want to play around with this in prefect land, remove this wait: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L798 and then run your flows using a
DaskExecutor
upvote 1
a
The future-within-future is more of a Dask situation than a prefect one; Dask will see that the
Mapped
future is resolved and tell the downstreams “you’re good to go!” and they would begin executing, operating on Dask futures as their input data (coming from
mapped_states
) instead of appropriately resolved data — Dask would only know to resolve and wait for the “outer” level of futures and would need extra information to know that there are more futures within that future that are relevant
Been playing around w/ this for a bit, and I think I understand more of the issue (specifically with Dask as an executor), but I’d think this could be any kind of Executor’s problem, right? Outside of the
LocalExecutor
, which isn’t passing futures around at all. If we listed the requirement of waiting on the futures to be completed (to not have the nested futures problem), we’d need to transfer that responsibility to somewhere else transparently for backwards compatability reasons. With people needing to populate
Task.run
with their code, do you think that a change like that could occur in the task runner instead? I was toying around with a naive implementation of the unnesting of futures and came up with this:
Copy code
def unwrap_or_wait(future_or_value):
    """
    Takes any argument and returns it directly unless its associated
    with Dask's delayed computation. If the incoming value is a future,
    the function recurses until the future has resolved, or if the
    incoming value is a delayed computation, it resolves the
    computation and returns the value behind it.
    """

    if not isinstance(future_or_value, (Future, Delayed)):
        # Item's value is passed in directly
        return future_or_value

    if isinstance(future_or_value, Future):

        if not future_or_value.done():
            print("Future not done, sleeping and waiting.")
            time.sleep(0.25)
            return unwrap_or_wait(future_or_value)
        else:
            # Handle nested futures
            futures_value = future_or_value.result()
            return unwrap_or_wait(futures_value)

    else:

        print(f"Delayed value found. Computing and returning!")
        computed = future_or_value.compute()

        return computed
The more I played around with it, the more I was thinking asyncio would help with all the waiting & submitting when ready, but that’s another conversation entirely!