Hi folks! I'm converting my code base to use orion...
# prefect-community
Hi folks! I'm converting my code base to use orion instead of v1, and I noticed that the ability to raise a SKIP signal (or something like that) is missing. In my original code, I have a sequence of tasks
A -> B -> C -> D
. Any of them can raise a SKIP signal, and if that happens, I want to skip all downstream tasks. It isn't a failure or a success, it's more of a "I'm not ready to run this yet, so don't run anything that depends on my output yet" Is that possible to do in Orion? I saw one earlier thread here about it, but the outcome was inconclusive... • one option is just to return a cancelled state, but that seems to suggest failure (which in my case would prompt a message to the admin, which I definitely don't want to happen for SKIPs. SKIPs happen very often in my particular case -- far more common than any other outcome) • another is to return a completed state, but then I need annoying
statements everywhere checking the outcome of previous tasks (skip vs. was actually run successfully). Actually, the whole reason I started using Prefect in the first place was for its ability to easily control flows where things get skipped 😉
Also in Part 5 of the blog series, Anna shows a screenshot of a partially successful flow with many tasks finishing with a
state, which might be the state you are looking for?
a good way to think about it is: how would you build it in plain Python without Prefect? in v2, you can write if/else so there is no need for SKIP signals. It was necessary in v1 because such if/else wasn't possible
@Taylor Curran thanks for the link to the blog post! I read through the two pages you sent, and did indeed see the
state. I found that
is a
state in
Copy code
class StateType(AutoEnum):
    """Enumeration of state types."""

    SCHEDULED = AutoEnum.auto()
    PENDING = AutoEnum.auto()
    RUNNING = AutoEnum.auto()
    COMPLETED = AutoEnum.auto()
    FAILED = AutoEnum.auto()
    CANCELLED = AutoEnum.auto()
    CRASHED = AutoEnum.auto()
    PAUSED = AutoEnum.auto()

It looks like I shouldn't be using
explicitly, since I imagine that will muck with orchestration, since it isn't a terminal state? I would think the closest to what I need is
, but I think that counts as a failure...perhaps I need to test that though. @Anna Geller thanks for your response! My problem is that my code will start to look quite messy if I use the usual
Copy code
if A didn't skip:
   if B didn't skip:
      if C didn't skip:
(not to mention that this
thing is very simplified compared to what I'm actually doing -- my actual code will be a disaster if I try to if/else everything)
being able to explicitly say SKIP and then all tasks downstream auto-SKIP as well was a very useful feature. kind of hoping to replicate that somehow. My "bandaid" for now was to include a boolean in the output object:
Copy code
a = A()
b = B(a)
c = C(b)
d = D(c)
where the objects
a, b, c, d
have a
boolean attribute, and then the functions
(--).skip = true
internally. Then at the beginning of each task, it just checks if its own input has the
flag set, and if so, returns immediately with the
on the output too. Bit of a hack but at least it avoids the crazy amount of if statements (my dependency graph of tasks isn't a chain, it's more of a tree...)
this looks quite complicated, why do you need this skipping? can you explain the problem you are trying to solve? there is certainly easier way
is this some event-driven use case?
probably is an easier way 😄 let me try to explain
"I'm not ready to run this yet" sounds like the sensor pattern I wrote about here https://medium.com/the-prefect-blog/scheduled-vs-event-driven-data-pipelines-orchestrate-anything-with-prefect-b915e6adc3ba
the sensor pattern looks super useful. definitely applicable to my case, i'll have to give that a read today. here's what I'm doing (a slight simplification, but this should give you the gist): I'm using prefect to run orchestration in my large undergraduate class. Basically, I have a flow that runs once per day: • loops over all assignments • checks whether the deadline has passed & student submission snapshot exists for that assignment • collects assignments (jupyter notebooks) and does some basic cleaning on them • generates solutions and distributes them to students • runs the autograder • notifies TAs about manual grading tasks; waits for the TAs to finish their grading • generates feedback forms • uploads grades to our LMS system • returns the feedback forms to students All of these things are probably best thought of as event-driven -- we should only collect assignments once the snapshot exist, we only autograde once the submissions are collected, etc etc. We do this currently by just running the flow once per day and each task checks whether it's ready to go. Otherwise it skips, and then all downstream stuff should skip too. Note that some of these "events" are from an online learning management system, some are from the local filesystem, some are on a remote filesystem, etc. I have a plan to make my tasks into
-like tasks that check some notion of "timestamp" of input/output and re-run tasks when the output is older than the input, but that's something for later...
thanks, this is extremely useful context and there are so many ways to approach it I believe given this requirement: "notifies TAs about manual grading tasks; waits for the TAs to finish their grading" you may benefit from the pause() and resume functionality we are currently building. We plan to release it this Thursday and it will roughly look like this:
Copy code
def my_task():

def my_flow():
	prefect.utilities.pause(timeout: int = N)
	data = my_task()
👍 1
and then once the TA finishes grading, they can click "Resume" button from the UI to continue processing the workflow
it seems that no SKIP signal is needed but it requires some thought to design it, I believe waiting for the Thursday release would be prudent since it will vastly simplify the use case for you
panda dancing 1
P 1