Hi! Given an unrun flow, what's the best way to ge...
# prefect-community
j
Hi! Given an unrun flow, what's the best way to get the inputs passed into the tasks (like the output of
task_runner.get_task_inputs(state, upstream_states)
but without having to run the flow)?
c
Hi Jeff! These inputs are necessarily unavailable, as they are the outputs of the upstream tasks after they have run
j
What do Task objects know about their interactions with other tasks, or is this all handled by the Flow context (since it manages the Edges and Task states)?
c
yea you hit the nail on the head - > Task objects are standalone pieces of configurable logic. They typically can be: - configured through
__init__
(initialization) - run through
run()
, which can optionally receive inputs which always come from upstream tasks and the Flow is responsible for tracking all dependency relationships
👍 1
j
Could I easily get the input objects from the Flow (which are there but just unresolved until the upstream tasks are run)? i.e. If I wanted to run terminal tasks (and
await
their dependencies) with my own TaskRunner, instead of the current run-root-tasks-first approach
When triggering Task A to run, does roughly the following happen? 1. The flow follows the edges to find the subset of tasks it needs for Task A. 2. It has TaskRunner run the tasks, starting from the root tasks, extracting results from their final/finished states and passing along downstream edges. The root tasks are just Parameters, their inputs are accessible ahead of time in
prefect.context["parameters"]
3. You can't easily access non-Param task inputs passed in
run
, because they come from the /final/ states of upstream tasks (and you can't just "wait" for these inputs, because you don't know which object they're from; the final upstream state object hasn't even been created)
c
Ah ok so if you’re interested in testing how your downstream tasks respond to inputs, I’d recommend using the
task_states
keyword argument to either
flow.run
or
FlowRunner.run
, which is a dictionary of task objects -> prefect state objects. This would allow you to “mock” upstream success with arbitrary results
👍 1
j
Could I do something like: store A's upstream tasks through
A.__init__()
in
self.dependencies
, and access their results in
run()
or another method?
c
hmmm we usually don’t recommend storing any state on the task objects, as that state can be corrupted and / or lost in certain execution environments
j
How would I implement Task.get_direct_dependencies() here?
In this case, each Task.run() returns a list of Futures (as a workaround of
map
making 100k tasks each doing the same 1 thing, we can instead have 1 task do 100k things, i.e. submit 100k requests to an external slurm-like scheduler we use, which returns us 100k futures).
c
I don’t think you could implement that on the Task object itself, you’d probably need to implement it on the Flow object and add a “task” argument to it; Tasks themselves have no knowledge / hooks of their dependencies, they just run with the inputs they’re provided when they’re told to run by a runner
alternatively
you could implement your own
TaskRunner
class and use it for execution -> this might actually be the best way to go
and this method would just be a new step in the task runner pipeline - you could override one of our internal steps and implement custom logic