Feedback on Radar view
# prefect-ui
k
BLUF: Am I missing some kind of control flow UI of a DAG, or is Radar the only UI? Will Radar or another UI show dependent-order-of-operations at some point? _*UPDATE: See threaded replies. You can direct control-flow of the UI if you get a
PrefectFuture
from a prior task and pass it into `next_task.submit(wait_for=future_for_prior_task)`*_ Hello. Regarding the Prefect 2.0 Radar View available from the orion server ... it's neat, and provides a unique dimension of data-traceability and drill-down/fan-out ... however I feel like it has over-rotated from the Prefect 1.0 Schematic to strictly visualize explicit point-to-point data propagation between tasks, and loses the logical sequence of task execution that is equally if not more important. Put another way: Radar highlights data flow and hides control flow. I would argue control flow is more important, as you can typically infer how data is flowing by tracing the flow of control, but inferring dependency and temporality from the subset of tasks that explicitly exchange Python data in a tightly-coupled way will lose sight of implicit data dependencies among tasks. It would seem that getting away from DAGs (directed, acyclic graphs) is a driving principle of Prefect 2.0 ("Goodbye, DAGs"). But I still find it immensely helpful to know progress of the flow execution, i.e. where it is in the overall control graph, or where it failed. Part of my context is that many of our DAGs / pipelines / ETLs / data processing is using external data stores (databases, files, lakehouse tables on cloud storage, etc.) with very large datasets being processed. Maybe this is more batch-processing vs. record streaming and the Radar UI is not as well suited for that? But it's not feasible to pull those large datasets into memory of a python process to be handed as python objects from task to task, or serialized from process to process. Therefore with only this implicit data-dependency, and not an explicit code-level one in the flow, no relationships is mapped between those very-much-related Tasks in the Radar UI. So questions: 1. Is there a way to code-in (via the Prefect API for tasks/flows/Futures) implicit data-dependencies from task-to-task? ... So that those tasks are laid out and connected on the Radar UI in expanding concentric circles? a. i.e. Task
do_a
pre-processes dataset
A
in
DB_A
... Task
do_b
transports resulting data in
DB_A
to external
API_B
... so
do_a
must happen before
do_b
and yields control to it only after done. 2. Even if there's not an implicit (or explicit) data-dependency, but just a matter of sequential control flow (e.g. as governed by the
SequentialTaskRunner
) can this be expressed via the Python Task/Flow API?
When using the SequentialTaskRunner, tasks are laid out on the same Radar ring with no obvious order, even though they clearly are run in-sequential-ordering (not concurrent). 3. I_*s there hope for a DAG-like schematic? And if so would that be built further into Radar, or another forthcoming UI?*_ 4. There seemed like some API constructs when Tasks returned PrefectFuture (or Task?) objects, which allowed some explicit functions to chain things together. But the recent shift to strip return objects down to their results makes that less possible. Is that Python API and how tasks can be tied together still under design? I saw a few other posts where people may be struggling with the same conceptual issue, linked below. • https://prefect-community.slack.com/archives/CL09KU1K7/p1649011482174799https://prefect-community.slack.com/archives/C0192RWGJQH/p1642702304024400
2
👀 3
❤️ 1
a
Thanks so much for your thoughtful feedback regarding visualization. I will forward it to the product team, and we'll keep you posted via #announcements once we have more to share about that use case.
o
Hi Keith, I'm not sure if I'm missing something reading your post, but Radar seems to have no issues visualizing our control flows, at least after giving it a hint or two using wait_for. I use the default ConcurrentTask Runner though, even when running things in sequence. Would you have a look at the picture I've included in my introduction post, I'm curious to see if that's what you're looking for. There are no data dependencies between the majority of our tasks. I agree that it'd be neat if it visualized things this way with less nudging, but we're otherwise happy with the result. https://prefect-community.slack.com/archives/C012PM4MRBM/p1660671573364479
🙏 1
This is what it looks like mid-run.
🙏 1
k
@Oscar Björhn I'll take another look at
wait_for
. I briefly tried it in this subflow but all the tasks just laid out on the same ring in arbitrary order rather than fanning out on extended rings (which is what I assume that visual metaphor is supposed to mean: a wider ring has dependencies on things on an an inner ring)
Copy code
@flow(name="Pipeline Setup Flow", task_runner=SequentialTaskRunner())
async def setup_subflow():
    await do_step_1()
    await do_step_2(wait_for=do_step_1)
    await do_step_3(wait_for=do_step_2)
    # ...
    # await do_step_n(wait_for=do_step_n_minus_1)
    # await do_step_n_plus_1(wait_for=do_step_n)

@task(name="Step 1")
async def do_step_1()
  asyncio.sleep(5)

@task(name="Step 2")
async def do_step_2()
  asyncio.sleep(5)

# ...
It'd be good to see a code example with the resulting image to cement the intended usage. I found this in the API docs for
prefect.tasks
, which seems to indicate that I need to add the
wait_for
param to the
submit(...)
call, but my IDE is hinting me to add it directly on the task call. I tried that latter but will try the former. The below code docs also take the
PrefectFuture
return object as the input to
wait_for
. It's a little fumbly trying to figure this out right off the bat, and the worth of the resulting visual is a big make-or-break criteria for tools like this, so a dedicated section of the getting started / tutorial docs to show how to enforce ordering/dependency control flow would be very welcome.
Copy code
Enforce ordering between tasks that do not exchange data
            >>> @task
            >>> def task_1():
            >>>     pass
            >>>
            >>> @task
            >>> def task_2():
            >>>     pass
            >>>
            >>> @flow
            >>> def my_flow():
            >>>     x = task_1.submit()
            >>>
            >>>     # task 2 will wait for task_1 to complete
            >>>     y = task_2.submit(wait_for=[x])
o
Yeah, if you want it to actually visualize the dependency you need to use submit, I think. But by including wait_for, you can still force synchronous execution. Try it out and let me know if it works. :)
💯 1
k
@Oscar Björhn thank you for this nudge!
.submit(wait_for=<PrefectFuture>)
is precisely what I needed. Though took a couple tries to settle on that. What I figured out: My simple example above now works if written like this:
Copy code
@flow(name="Pipeline Setup Flow", task_runner=SequentialTaskRunner())
async def setup_subflow():
    f1 = await do_step_1().submit()
    f2 = await do_step_2.submit(wait_for=f1)
    f3 = await do_step_3.submit(wait_for=f2)
    # ...
    fn = await do_step_n.submit(wait_for=fn_minus_1)
    fn_plus_1 = await do_step_n_plus_1.submit(wait_for=fn)

@task(name="Step 1")
async def do_step_1()
  asyncio.sleep(5)

@task(name="Step 2")
async def do_step_2()
  asyncio.sleep(5)

# ...
1. The API Reference docs under
prefect.tasks
show that
Task.__call__(...)
and
Task.submit(...)
both support a
wait_for=[Optional[Iterable[prefect.futures.PrefectFuture]]
parameter. However only when used with
.submit(wait_for=...)
did this seem to have the desired effect on the UI,
chaining each subsequent Task in one further-out circle. a. Maybe is that a bug in the implementation of
Task.__call__(wait_for=some_future)
. I noticed by default if using
__call__
it sets the tasks task_runner to
SequentialTaskRunner
, whereas with
submit(...)
it's defaulted to
None
and will therefore fallback to the Flow's
task_runner
. Maybe that's interfering? b. Still, even when the _Flow_'s
task_runner=SequentialTaskRunner()
, when using
submit(wait_for=...)
the UI is rendered in concentric/dependent circles correctly. 2. The thing (object) you have to
wait_for
is a
PrefectFuture
, i.e. the return of a
.submit()
call.
And now that I mention that, this is probably why this only works with
.submit(...)
and not
__call__()
, as calling the task directly like
result = my_task()
will strip the PrefectFuture down to its result and return that. My tasks don't return anything, so it'll be
None
... nothing to
wait_for
. 3.
PrefectFuture.wait()
doesn't do anything for the UI with dependent/concentric tasks.
I feel there's an opportunity here. Calling
d = my_task.submit().wait()
should be an implicit
wait_for
for whatever task or group of tasks follows it. a. Maybe it's just a matter of changing the thinking: using instead
wait_for
on the next task or batch of concurrent tasks, rather than eagerly calling
.wait()
on the task you just submitted. b. But the storing of return objects and passing them forward to subsequent tasks is tedious. A good implementation would be that calling
PrefectFuture.wait()
freeze a ring on radar, and blows all subsequent tasks to the next outer ring. And if there's a batch of concurrent tasks that need to be waited on, add a
Task.wait_all(tasks=Iterable[PrefectFuture])
... so that batch can remain on the same ring, but all others after them, that must wait on them, are bumped to an outer ring. c. Furthering this opportunity in
async
/
await
syntax, since
.wait()
(I think) has the same effect of
await my_task.submit()
, it'd be great if the rings are bumped out implicitly by encountering an awaited task. d. And, on the batch/group side of things, if
asyncio.gather(t1.submit(), t2.submit(), t3.submit())
is encountered, it's "waiting" on those 3 subset of tasks, and everything after should be bumped out a ring. 4. Can't yet find a way to link subflows to each other. A subflow doesn't return a
PrefectFuture
, but the result of the flow (or state). I guess I could try to force the subflow to return the
PrefectFuture
of its last Task. But that feels hacky. a. Any tips here appreciated. b. I'd like to be able to run subflows in sequence, and make it so that a subsequent subflows has a
wait_for
condition on a prior subflow, or a prior task. Either or. c. Or if trying to run a set of subflows in parallel using
asyncio.gather(sf1(), sf2(), sf3())
, allow the next Task or subflow to
wait_for
the results/futures of all 3. d. I feel like it'd perhaps be natural for a
Flow
to also have a
.submit()
function, and when used, return a
PrefectFuture
too, like a
Task
does. And when instead choosing to call the flow with
__call__(...)
instead of
.submit()
, just return the result. CC: @Anna Geller
🙏 1
z
Maybe is that a bug in the implementation of
Task.__call__(wait_for=some_future)
. I noticed by default if using
__call__
it sets the tasks task_runner to
SequentialTaskRunner
, whereas with
submit(...)
it’s defaulted to
None
and will therefore fallback to the Flow’s
task_runner
. Maybe that’s interfering?
If this is the case, it is a bug. Can you open a minimal example on GitHub?
upvote 2
If you’re passing in raw data e.g.
None
to
wait_for
we definitely won’t be able to track the relationship.
If you have
Copy code
x = my_task()
y = my_task(x)
where your task is returning an object that is unique (e.g. not a global singleton like
None
) we will do our best to track that relationship despite the lack of futures.
k
RE: point #2 of mine above, it clarifies why I ran into no chaining-of-dependencies when not using `.submit()`:
2. The thing (object) you have to
wait_for
is a
PrefectFuture
, i.e. the return of a
.submit()
call.
And now that I mention that, this is probably why this only works with
.submit(...)
and not
__call__()
, as calling the task directly like
result = my_task()
will strip the PrefectFuture down to its result and return that. My tasks don't return anything, so it'll be
None
... nothing to
wait_for
.
I didn't realize it could also just "wait_for" with arbitrary data given, rather than a
PrefectFuture
. That's neat. But in my case, my Tasks are not exchanging data directly, so they just return
None
when using
__call()__
. So maybe it's not a bug. But it is misleading to try to use the
wait_for
arg in
__call()__
... and the odd case where it can derive relationships based on the data returned might be a minority case. It could be worth thinking of getting rid of that param in
__call()__
to force people to use
.submit()
. Or throw an Error if
wait_for
in
__call()__
receives
None
(I like errors for training developers, but a runtime warning could also work). @Zanie