Hi, at the top of my flow I'm doing sanity checkin...
# ask-community
p
Hi, at the top of my flow I'm doing sanity checking type stuff, if something isn't correct I
Copy code
raise signals.FAIL('Didn't work')
I do not want the flow to proceed. However, other downstream tasks occur despite the exception being raised. Is this expected behavior?
k
Hey @Philip MacMenamin, yes this is expected. The signal if only for the task itself. The
FAIL
should propagate though and those downstream ones should
FAIL
as well right? You can also
raise SKIP
p
I don't want the downstream tasks to even attempt to run if I fail this task.
(the downstream tasks appear to run, some appear to run successfully.)
k
Are the downstream tasks connected to this
FAIL
task? Like is the failed task an upstream dependency? If it’s an upstream dependency, the FAIL will propagate and they won’t run
p
the failed task should be an upstream dependency for everything else, either directly or indirectly. (As in, dependency of a dependency etc)
k
In this example,
b
and
c
won’t trigger because
a
failed:
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.signals import FAIL

@task()
def abc():
    raise FAIL("message")

@task()
def bcd():
    return 1

@task()
def cde():
    return 1

with Flow("test") as flow:
    a = abc()
    b = bcd(upstream_tasks=[a])
    c = cde(upstream_tasks=[b])
    
flow.run()
Will check that
Which is raising it? the
sanity_check
?
p
the creation of the job object
before sanity check even
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.signals import FAIL

@task()
def abc():
    raise FAIL("message")

@task()
def bcd():
    return 1

@task()
def cde():
    return 1

with Flow("test") as flow:
    a = abc()
    b = bcd(a)
    c = cde(b)
    
flow.run()
So, will the above still attempt to run bcd and cde?
k
No it won’t you can try it.
b
and
c
will be
TriggerFailed
, indicating something upstream failed
p
ok, this is what I expected.
k
Would you be able to show me the run schematic for the flow run?
p
this is the init_job failing, and downstream tasks still running
k
Been looking and I can’t see anything immediately wrong
p
with my source? Or with the run I posted? To me it looks like things downstream are running when they should not be.
k
With the code. I definitely see the problem lol
p
oh, ok, heh cool. Thanks!
k
Unfortunately, I don’t see anything. I even went to the utils and state_handler and can’t come up with any ideas
p
so do we think it's a bug?
k
Hard to say at the moment. Definitely open to making a ticket if you could provide a more minimal example? It’s kinda hard to exactly identify what the bug would be. If you don’t have time, I can try making an example later in the afternoon and we’ll see if I can reproduce?
You had parallel tasks so I assume this was on the DaskExecutor you saw this behavior right?
p
yes, sorry, got caught up with some other work for the last while. Correct, I'm using DaskExecutor.
k
What Prefect version are you running?
p
Copy code
{
  "config_overrides": {
    "server": {
      "ui": {
        "apollo_url": true
      }
    }
  },
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.8.0-1035-aws-x86_64-with-glibc2.29",
    "prefect_backend": "server",
    "prefect_version": "0.15.6",
    "python_version": "3.8.10"
  }
}
k
Ok we had a related issue but it was fixed in 0.15.6. Last question I have, do you experience it with
flow.run()
or is it only when you run on Cloud?
p
Fails correctly via .run (ie fails to init and downstream tasks do not start
k
Thanks! Will work on this.
p
looks like I can force it to work if I explicitly set everything to be upstream_tasks.
Copy code
glb_fps = get_glb_fps(job, upstream_tasks=[chimera_ok, job, files, sanity_ok])
k
Ah thanks for circling back. That's weird though since the upstream task was already in the flow schematic.