https://prefect.io logo
p

Philip MacMenamin

10/25/2021, 2:50 PM
Hi, at the top of my flow I'm doing sanity checking type stuff, if something isn't correct I
Copy code
raise signals.FAIL('Didn't work')
I do not want the flow to proceed. However, other downstream tasks occur despite the exception being raised. Is this expected behavior?
k

Kevin Kho

10/25/2021, 2:53 PM
Hey @Philip MacMenamin, yes this is expected. The signal if only for the task itself. The
FAIL
should propagate though and those downstream ones should
FAIL
as well right? You can also
raise SKIP
p

Philip MacMenamin

10/25/2021, 2:55 PM
I don't want the downstream tasks to even attempt to run if I fail this task.
(the downstream tasks appear to run, some appear to run successfully.)
k

Kevin Kho

10/25/2021, 2:57 PM
Are the downstream tasks connected to this
FAIL
task? Like is the failed task an upstream dependency? If it’s an upstream dependency, the FAIL will propagate and they won’t run
p

Philip MacMenamin

10/25/2021, 3:01 PM
the failed task should be an upstream dependency for everything else, either directly or indirectly. (As in, dependency of a dependency etc)
k

Kevin Kho

10/25/2021, 3:03 PM
In this example,
b
and
c
won’t trigger because
a
failed:
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.signals import FAIL

@task()
def abc():
    raise FAIL("message")

@task()
def bcd():
    return 1

@task()
def cde():
    return 1

with Flow("test") as flow:
    a = abc()
    b = bcd(upstream_tasks=[a])
    c = cde(upstream_tasks=[b])
    
flow.run()
Will check that
Which is raising it? the
sanity_check
?
p

Philip MacMenamin

10/25/2021, 3:05 PM
the creation of the job object
before sanity check even
Copy code
import prefect
from prefect import task, Flow
from prefect.engine.signals import FAIL

@task()
def abc():
    raise FAIL("message")

@task()
def bcd():
    return 1

@task()
def cde():
    return 1

with Flow("test") as flow:
    a = abc()
    b = bcd(a)
    c = cde(b)
    
flow.run()
So, will the above still attempt to run bcd and cde?
k

Kevin Kho

10/25/2021, 3:07 PM
No it won’t you can try it.
b
and
c
will be
TriggerFailed
, indicating something upstream failed
p

Philip MacMenamin

10/25/2021, 3:09 PM
ok, this is what I expected.
k

Kevin Kho

10/25/2021, 3:10 PM
Would you be able to show me the run schematic for the flow run?
p

Philip MacMenamin

10/25/2021, 3:12 PM
this is the init_job failing, and downstream tasks still running
k

Kevin Kho

10/25/2021, 3:19 PM
Been looking and I can’t see anything immediately wrong
p

Philip MacMenamin

10/25/2021, 3:28 PM
with my source? Or with the run I posted? To me it looks like things downstream are running when they should not be.
k

Kevin Kho

10/25/2021, 3:29 PM
With the code. I definitely see the problem lol
p

Philip MacMenamin

10/25/2021, 3:30 PM
oh, ok, heh cool. Thanks!
k

Kevin Kho

10/25/2021, 3:33 PM
Unfortunately, I don’t see anything. I even went to the utils and state_handler and can’t come up with any ideas
p

Philip MacMenamin

10/25/2021, 3:52 PM
so do we think it's a bug?
k

Kevin Kho

10/25/2021, 3:58 PM
Hard to say at the moment. Definitely open to making a ticket if you could provide a more minimal example? It’s kinda hard to exactly identify what the bug would be. If you don’t have time, I can try making an example later in the afternoon and we’ll see if I can reproduce?
You had parallel tasks so I assume this was on the DaskExecutor you saw this behavior right?
p

Philip MacMenamin

10/26/2021, 6:37 PM
yes, sorry, got caught up with some other work for the last while. Correct, I'm using DaskExecutor.
k

Kevin Kho

10/26/2021, 6:39 PM
What Prefect version are you running?
p

Philip MacMenamin

10/26/2021, 6:56 PM
Copy code
{
  "config_overrides": {
    "server": {
      "ui": {
        "apollo_url": true
      }
    }
  },
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.8.0-1035-aws-x86_64-with-glibc2.29",
    "prefect_backend": "server",
    "prefect_version": "0.15.6",
    "python_version": "3.8.10"
  }
}
k

Kevin Kho

10/26/2021, 7:03 PM
Ok we had a related issue but it was fixed in 0.15.6. Last question I have, do you experience it with
flow.run()
or is it only when you run on Cloud?
p

Philip MacMenamin

10/26/2021, 7:21 PM
Fails correctly via .run (ie fails to init and downstream tasks do not start
k

Kevin Kho

10/26/2021, 7:26 PM
Thanks! Will work on this.
p

Philip MacMenamin

10/26/2021, 7:39 PM
looks like I can force it to work if I explicitly set everything to be upstream_tasks.
Copy code
glb_fps = get_glb_fps(job, upstream_tasks=[chimera_ok, job, files, sanity_ok])
k

Kevin Kho

10/26/2021, 8:07 PM
Ah thanks for circling back. That's weird though since the upstream task was already in the flow schematic.
2 Views