Keith Hickey
08/27/2022, 2:12 PMPrefectFuture
from a prior task and pass it into `next_task.submit(wait_for=future_for_prior_task)`*_
Hello. Regarding the Prefect 2.0 Radar View available from the orion server ... it's neat, and provides a unique dimension of data-traceability and drill-down/fan-out ... however I feel like it has over-rotated from the Prefect 1.0 Schematic to strictly visualize explicit point-to-point data propagation between tasks, and loses the logical sequence of task execution that is equally if not more important.
Put another way: Radar highlights data flow and hides control flow.
I would argue control flow is more important, as you can typically infer how data is flowing by tracing the flow of control, but inferring dependency and temporality from the subset of tasks that explicitly exchange Python data in a tightly-coupled way will lose sight of implicit data dependencies among tasks. It would seem that getting away from DAGs (directed, acyclic graphs) is a driving principle of Prefect 2.0 ("Goodbye, DAGs"). But I still find it immensely helpful to know progress of the flow execution, i.e. where it is in the overall control graph, or where it failed.
Part of my context is that many of our DAGs / pipelines / ETLs / data processing is using external data stores (databases, files, lakehouse tables on cloud storage, etc.) with very large datasets being processed. Maybe this is more batch-processing vs. record streaming and the Radar UI is not as well suited for that? But it's not feasible to pull those large datasets into memory of a python process to be handed as python objects from task to task, or serialized from process to process. Therefore with only this implicit data-dependency, and not an explicit code-level one in the flow, no relationships is mapped between those very-much-related Tasks in the Radar UI. So questions:
1. Is there a way to code-in (via the Prefect API for tasks/flows/Futures) implicit data-dependencies from task-to-task? ... So that those tasks are laid out and connected on the Radar UI in expanding concentric circles?
a. i.e. Task do_a
pre-processes dataset A
in DB_A
... Task do_b
transports resulting data in DB_A
to external API_B
... so do_a
must happen before do_b
and yields control to it only after done.
2. Even if there's not an implicit (or explicit) data-dependency, but just a matter of sequential control flow (e.g. as governed by the SequentialTaskRunner
) can this be expressed via the Python Task/Flow API? When using the SequentialTaskRunner, tasks are laid out on the same Radar ring with no obvious order, even though they clearly are run in-sequential-ordering (not concurrent).
3. I_*s there hope for a DAG-like schematic? And if so would that be built further into Radar, or another forthcoming UI?*_
4. There seemed like some API constructs when Tasks returned PrefectFuture (or Task?) objects, which allowed some explicit functions to chain things together. But the recent shift to strip return objects down to their results makes that less possible. Is that Python API and how tasks can be tied together still under design?
I saw a few other posts where people may be struggling with the same conceptual issue, linked below.
• https://prefect-community.slack.com/archives/CL09KU1K7/p1649011482174799
• https://prefect-community.slack.com/archives/C0192RWGJQH/p1642702304024400Anna Geller
08/27/2022, 2:46 PMOscar Björhn
08/27/2022, 4:44 PMKeith Hickey
08/29/2022, 1:11 PMwait_for
. I briefly tried it in this subflow but all the tasks just laid out on the same ring in arbitrary order rather than fanning out on extended rings (which is what I assume that visual metaphor is supposed to mean: a wider ring has dependencies on things on an an inner ring)
@flow(name="Pipeline Setup Flow", task_runner=SequentialTaskRunner())
async def setup_subflow():
await do_step_1()
await do_step_2(wait_for=do_step_1)
await do_step_3(wait_for=do_step_2)
# ...
# await do_step_n(wait_for=do_step_n_minus_1)
# await do_step_n_plus_1(wait_for=do_step_n)
@task(name="Step 1")
async def do_step_1()
asyncio.sleep(5)
@task(name="Step 2")
async def do_step_2()
asyncio.sleep(5)
# ...
It'd be good to see a code example with the resulting image to cement the intended usage. I found this in the API docs for prefect.tasks
, which seems to indicate that I need to add the wait_for
param to the submit(...)
call, but my IDE is hinting me to add it directly on the task call. I tried that latter but will try the former. The below code docs also take the PrefectFuture
return object as the input to wait_for
.
It's a little fumbly trying to figure this out right off the bat, and the worth of the resulting visual is a big make-or-break criteria for tools like this, so a dedicated section of the getting started / tutorial docs to show how to enforce ordering/dependency control flow would be very welcome.
Enforce ordering between tasks that do not exchange data
>>> @task
>>> def task_1():
>>> pass
>>>
>>> @task
>>> def task_2():
>>> pass
>>>
>>> @flow
>>> def my_flow():
>>> x = task_1.submit()
>>>
>>> # task 2 will wait for task_1 to complete
>>> y = task_2.submit(wait_for=[x])
Oscar Björhn
08/29/2022, 3:01 PMKeith Hickey
08/29/2022, 3:08 PM.submit(wait_for=<PrefectFuture>)
is precisely what I needed. Though took a couple tries to settle on that. What I figured out:
My simple example above now works if written like this:
@flow(name="Pipeline Setup Flow", task_runner=SequentialTaskRunner())
async def setup_subflow():
f1 = await do_step_1().submit()
f2 = await do_step_2.submit(wait_for=f1)
f3 = await do_step_3.submit(wait_for=f2)
# ...
fn = await do_step_n.submit(wait_for=fn_minus_1)
fn_plus_1 = await do_step_n_plus_1.submit(wait_for=fn)
@task(name="Step 1")
async def do_step_1()
asyncio.sleep(5)
@task(name="Step 2")
async def do_step_2()
asyncio.sleep(5)
# ...
1. The API Reference docs under prefect.tasks
show that Task.__call__(...)
and Task.submit(...)
both support a wait_for=[Optional[Iterable[prefect.futures.PrefectFuture]]
parameter. However only when used with .submit(wait_for=...)
did this seem to have the desired effect on the UI, chaining each subsequent Task in one further-out circle.
a. Maybe is that a bug in the implementation of Task.__call__(wait_for=some_future)
. I noticed by default if using __call__
it sets the tasks task_runner to SequentialTaskRunner
, whereas with submit(...)
it's defaulted to None
and will therefore fallback to the Flow's task_runner
. Maybe that's interfering?
b. Still, even when the _Flow_'s task_runner=SequentialTaskRunner()
, when using submit(wait_for=...)
the UI is rendered in concentric/dependent circles correctly.
2. The thing (object) you have to wait_for
is a PrefectFuture
, i.e. the return of a .submit()
call. And now that I mention that, this is probably why this only works with .submit(...)
and not __call__()
, as calling the task directly like result = my_task()
will strip the PrefectFuture down to its result and return that. My tasks don't return anything, so it'll be None
... nothing to wait_for
.
3. PrefectFuture.wait()
doesn't do anything for the UI with dependent/concentric tasks. I feel there's an opportunity here. Calling d = my_task.submit().wait()
should be an implicit wait_for
for whatever task or group of tasks follows it.
a. Maybe it's just a matter of changing the thinking: using instead wait_for
on the next task or batch of concurrent tasks, rather than eagerly calling .wait()
on the task you just submitted.
b. But the storing of return objects and passing them forward to subsequent tasks is tedious. A good implementation would be that calling PrefectFuture.wait()
freeze a ring on radar, and blows all subsequent tasks to the next outer ring. And if there's a batch of concurrent tasks that need to be waited on, add a Task.wait_all(tasks=Iterable[PrefectFuture])
... so that batch can remain on the same ring, but all others after them, that must wait on them, are bumped to an outer ring.
c. Furthering this opportunity in async
/ await
syntax, since .wait()
(I think) has the same effect of await my_task.submit()
, it'd be great if the rings are bumped out implicitly by encountering an awaited task.
d. And, on the batch/group side of things, if asyncio.gather(t1.submit(), t2.submit(), t3.submit())
is encountered, it's "waiting" on those 3 subset of tasks, and everything after should be bumped out a ring.
4. Can't yet find a way to link subflows to each other. A subflow doesn't return a PrefectFuture
, but the result of the flow (or state). I guess I could try to force the subflow to return the PrefectFuture
of its last Task. But that feels hacky.
a. Any tips here appreciated.
b. I'd like to be able to run subflows in sequence, and make it so that a subsequent subflows has a wait_for
condition on a prior subflow, or a prior task. Either or.
c. Or if trying to run a set of subflows in parallel using asyncio.gather(sf1(), sf2(), sf3())
, allow the next Task or subflow to wait_for
the results/futures of all 3.
d. I feel like it'd perhaps be natural for a Flow
to also have a .submit()
function, and when used, return a PrefectFuture
too, like a Task
does. And when instead choosing to call the flow with __call__(...)
instead of .submit()
, just return the result.
CC: @Anna GellerZanie
08/31/2022, 7:34 PMMaybe is that a bug in the implementation ofIf this is the case, it is a bug. Can you open a minimal example on GitHub?. I noticed by default if usingTask.__call__(wait_for=some_future)
it sets the tasks task_runner to__call__
, whereas withSequentialTaskRunner
it’s defaulted tosubmit(...)
and will therefore fallback to the Flow’sNone
. Maybe that’s interfering?task_runner
None
to wait_for
we definitely won’t be able to track the relationship.x = my_task()
y = my_task(x)
where your task is returning an object that is unique (e.g. not a global singleton like None
) we will do our best to track that relationship despite the lack of futures.Keith Hickey
08/31/2022, 7:45 PM2. The thing (object) you have toI didn't realize it could also just "wait_for" with arbitrary data given, rather than ais await_for
, i.e. the return of aPrefectFuture
call. And now that I mention that, this is probably why this only works with.submit()
and not.submit(...)
, as calling the task directly like__call__()
will strip the PrefectFuture down to its result and return that. My tasks don't return anything, so it'll beresult = my_task()
... nothing toNone
.wait_for
PrefectFuture
. That's neat.
But in my case, my Tasks are not exchanging data directly, so they just return None
when using __call()__
.
So maybe it's not a bug. But it is misleading to try to use the wait_for
arg in __call()__
... and the odd case where it can derive relationships based on the data returned might be a minority case. It could be worth thinking of getting rid of that param in __call()__
to force people to use .submit()
. Or throw an Error if wait_for
in __call()__
receives None
(I like errors for training developers, but a runtime warning could also work).
@Zanie