Trevor Campbell
11/25/2022, 8:11 PMA -> B -> C -> D
. Any of them can raise a SKIP signal, and if that happens, I want to skip all downstream tasks. It isn't a failure or a success, it's more of a "I'm not ready to run this yet, so don't run anything that depends on my output yet"
Is that possible to do in Orion? I saw one earlier thread here about it, but the outcome was inconclusive...
• one option is just to return a cancelled state, but that seems to suggest failure (which in my case would prompt a message to the admin, which I definitely don't want to happen for SKIPs. SKIPs happen very often in my particular case -- far more common than any other outcome)
• another is to return a completed state, but then I need annoying if
statements everywhere checking the outcome of previous tasks (skip vs. was actually run successfully). Actually, the whole reason I started using Prefect in the first place was for its ability to easily control flows where things get skipped 😉Taylor Curran
11/28/2022, 2:21 PMNotReady
state, which might be the state you are looking for?Anna Geller
11/28/2022, 5:44 PMTrevor Campbell
11/28/2022, 5:58 PMNotReady
state. I found that NotReady
is a PENDING
state in src/prefect/orion/schemas/states.py
:
class StateType(AutoEnum):
"""Enumeration of state types."""
SCHEDULED = AutoEnum.auto()
PENDING = AutoEnum.auto()
RUNNING = AutoEnum.auto()
COMPLETED = AutoEnum.auto()
FAILED = AutoEnum.auto()
CANCELLED = AutoEnum.auto()
CRASHED = AutoEnum.auto()
PAUSED = AutoEnum.auto()
TERMINAL_STATES = {
StateType.COMPLETED,
StateType.CANCELLED,
StateType.FAILED,
StateType.CRASHED,
}
It looks like I shouldn't be using PENDING
explicitly, since I imagine that will muck with orchestration, since it isn't a terminal state? I would think the closest to what I need is CANCELLED
, but I think that counts as a failure...perhaps I need to test that though.
@Anna Geller thanks for your response! My problem is that my code will start to look quite messy if I use the usual if/else
stuff:
A()
if A didn't skip:
B()
if B didn't skip:
C()
if C didn't skip:
D()
(not to mention that this A->B->C->D
thing is very simplified compared to what I'm actually doing -- my actual code will be a disaster if I try to if/else everything)a = A()
b = B(a)
c = C(b)
d = D(c)
where the objects a, b, c, d
have a .skip
boolean attribute, and then the functions A,B,C,D
set (--).skip = true
internally. Then at the beginning of each task, it just checks if its own input has the .skip
flag set, and if so, returns immediately with the .skip=true
on the output too.
Bit of a hack but at least it avoids the crazy amount of if statements (my dependency graph of tasks isn't a chain, it's more of a tree...)Anna Geller
11/28/2022, 6:07 PMTrevor Campbell
11/28/2022, 6:07 PMAnna Geller
11/28/2022, 6:07 PMTrevor Campbell
11/28/2022, 6:14 PMmake
-like tasks that check some notion of "timestamp" of input/output and re-run tasks when the output is older than the input, but that's something for later...Anna Geller
11/28/2022, 7:47 PM@task
def my_task():
...
@flow
def my_flow():
...
prefect.utilities.pause(timeout: int = N)
data = my_task()
...