Alex Cano
04/07/2020, 1:37 AMChris White
04/07/2020, 1:49 AMTaskRunner
is responsible for both submitting and waiting on the children tasks that are dynamically spawned. The reason this is so important is because the futures created here are more difficult to communicate back to the flow runner and historically have caused errors because we are gathering futures created with a different client (the errors here were possibly more on Prefect than on Dask, but still represent a complication)executor.submit
does not mean that task will begin running immediately, only that it has been communicated to the executor. So when using the LocalExecutor
that call is blocking until completion, but when using the DaskExecutor
the call returns immediately. I mention this because neither of the runners have the role of knowing when a task is ready to run from the computational standpoint, they only determine whether a task should proceed with a run given the information about the upstream states (which the executor is responsible for ensuring are completed)all that would matter is that it sees is the upstream dependency on M tasks, and would just need to wait until those are done.The new implementation will do just this --> within the flow runner, if a task is mapped, we will wait until its upstream result is ready and at that point it will know the width of the graph and can submit each child itself. This will allow the flow runner to submit all children and still move on submitting more work to the executor, which is the key to enabling DFE
Alex Cano
04/07/2020, 2:46 AMChris White
04/07/2020, 3:01 AMwait
on the futures within the TaskRunner
. From the FlowRunner
perspective we would have:
state = executor.submit(**kwargs)
and this state
object is actually a dask future. The resolved future would be a Mapped
state with more futures representing the children as an attribute (probably mapped_states
).
The catch is that downstream submit
calls that the flow runner makes would need to know to wait not only for the resolved Mapped
state but also all the children it submitted. That sort of futures-within-futures unpacking is possible but introduces loads more complexityAlex Cano
04/07/2020, 3:13 AMChris White
04/07/2020, 3:15 AMMapped
future is resolved and tell the downstreams “you’re good to go!” and they would begin executing, operating on Dask futures as their input data (coming from mapped_states
) instead of appropriately resolved data — Dask would only know to resolve and wait for the “outer” level of futures and would need extra information to know that there are more futures within that future that are relevantfrom dask import delayed
@delayed
def return_futures(n):
return [delayed(x + 1) for x in range(n)]
@delayed
def sumit(upstream):
return sum(upstream)
final = sumit(return_futures(10))
final.compute()
# Delayed('add-8e93f1daac7666a88c0b0de72667c460')
this doesn’t highlight any error messages but notice that the sumit
function ran (evidenced by final.compute()
actually returning) but it ran on a list of unresolved futures, not on a list of numbers like we wantedAlex Cano
04/07/2020, 3:21 AMChris White
04/07/2020, 3:21 AMsumit
to:
@delayed
def sumit(upstream):
print([type(x) for x in upstream])
return sum(upstream)
you’ll see its operating on futures instead of numbersAlex Cano
04/07/2020, 3:24 AMChris White
04/07/2020, 3:26 AMDaskExecutor
Alex Cano
04/07/2020, 6:05 PMThe future-within-future is more of a Dask situation than a prefect one; Dask will see that theBeen playing around w/ this for a bit, and I think I understand more of the issue (specifically with Dask as an executor), but I’d think this could be any kind of Executor’s problem, right? Outside of thefuture is resolved and tell the downstreams “you’re good to go!” and they would begin executing, operating on Dask futures as their input data (coming fromMapped
) instead of appropriately resolved data — Dask would only know to resolve and wait for the “outer” level of futures and would need extra information to know that there are more futures within that future that are relevantmapped_states
LocalExecutor
, which isn’t passing futures around at all.
If we listed the requirement of waiting on the futures to be completed (to not have the nested futures problem), we’d need to transfer that responsibility to somewhere else transparently for backwards compatability reasons. With people needing to populate Task.run
with their code, do you think that a change like that could occur in the task runner instead?
I was toying around with a naive implementation of the unnesting of futures and came up with this:
def unwrap_or_wait(future_or_value):
"""
Takes any argument and returns it directly unless its associated
with Dask's delayed computation. If the incoming value is a future,
the function recurses until the future has resolved, or if the
incoming value is a delayed computation, it resolves the
computation and returns the value behind it.
"""
if not isinstance(future_or_value, (Future, Delayed)):
# Item's value is passed in directly
return future_or_value
if isinstance(future_or_value, Future):
if not future_or_value.done():
print("Future not done, sleeping and waiting.")
time.sleep(0.25)
return unwrap_or_wait(future_or_value)
else:
# Handle nested futures
futures_value = future_or_value.result()
return unwrap_or_wait(futures_value)
else:
print(f"Delayed value found. Computing and returning!")
computed = future_or_value.compute()
return computed
The more I played around with it, the more I was thinking asyncio would help with all the waiting & submitting when ready, but that’s another conversation entirely!