p

    Philip MacMenamin

    11 months ago
    Hi, at the top of my flow I'm doing sanity checking type stuff, if something isn't correct I
    raise signals.FAIL('Didn't work')
    I do not want the flow to proceed. However, other downstream tasks occur despite the exception being raised. Is this expected behavior?
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey @Philip MacMenamin, yes this is expected. The signal if only for the task itself. The
    FAIL
    should propagate though and those downstream ones should
    FAIL
    as well right? You can also
    raise SKIP
    p

    Philip MacMenamin

    11 months ago
    I don't want the downstream tasks to even attempt to run if I fail this task.
    (the downstream tasks appear to run, some appear to run successfully.)
    Kevin Kho

    Kevin Kho

    11 months ago
    Are the downstream tasks connected to this
    FAIL
    task? Like is the failed task an upstream dependency? If it’s an upstream dependency, the FAIL will propagate and they won’t run
    p

    Philip MacMenamin

    11 months ago
    the failed task should be an upstream dependency for everything else, either directly or indirectly. (As in, dependency of a dependency etc)
    Kevin Kho

    Kevin Kho

    11 months ago
    In this example,
    b
    and
    c
    won’t trigger because
    a
    failed:
    import prefect
    from prefect import task, Flow
    from prefect.engine.signals import FAIL
    
    @task()
    def abc():
        raise FAIL("message")
    
    @task()
    def bcd():
        return 1
    
    @task()
    def cde():
        return 1
    
    with Flow("test") as flow:
        a = abc()
        b = bcd(upstream_tasks=[a])
        c = cde(upstream_tasks=[b])
        
    flow.run()
    Will check that
    Which is raising it? the
    sanity_check
    ?
    p

    Philip MacMenamin

    11 months ago
    the creation of the job object
    before sanity check even
    import prefect
    from prefect import task, Flow
    from prefect.engine.signals import FAIL
    
    @task()
    def abc():
        raise FAIL("message")
    
    @task()
    def bcd():
        return 1
    
    @task()
    def cde():
        return 1
    
    with Flow("test") as flow:
        a = abc()
        b = bcd(a)
        c = cde(b)
        
    flow.run()
    So, will the above still attempt to run bcd and cde?
    Kevin Kho

    Kevin Kho

    11 months ago
    No it won’t you can try it.
    b
    and
    c
    will be
    TriggerFailed
    , indicating something upstream failed
    p

    Philip MacMenamin

    11 months ago
    ok, this is what I expected.
    Kevin Kho

    Kevin Kho

    11 months ago
    Would you be able to show me the run schematic for the flow run?
    p

    Philip MacMenamin

    11 months ago
    this is the init_job failing, and downstream tasks still running
    Kevin Kho

    Kevin Kho

    11 months ago
    Been looking and I can’t see anything immediately wrong
    p

    Philip MacMenamin

    11 months ago
    with my source? Or with the run I posted? To me it looks like things downstream are running when they should not be.
    Kevin Kho

    Kevin Kho

    11 months ago
    With the code. I definitely see the problem lol
    p

    Philip MacMenamin

    11 months ago
    oh, ok, heh cool. Thanks!
    Kevin Kho

    Kevin Kho

    11 months ago
    Unfortunately, I don’t see anything. I even went to the utils and state_handler and can’t come up with any ideas
    p

    Philip MacMenamin

    11 months ago
    so do we think it's a bug?
    Kevin Kho

    Kevin Kho

    11 months ago
    Hard to say at the moment. Definitely open to making a ticket if you could provide a more minimal example? It’s kinda hard to exactly identify what the bug would be. If you don’t have time, I can try making an example later in the afternoon and we’ll see if I can reproduce?
    You had parallel tasks so I assume this was on the DaskExecutor you saw this behavior right?
    p

    Philip MacMenamin

    10 months ago
    yes, sorry, got caught up with some other work for the last while. Correct, I'm using DaskExecutor.
    Kevin Kho

    Kevin Kho

    10 months ago
    What Prefect version are you running?
    p

    Philip MacMenamin

    10 months ago
    {
      "config_overrides": {
        "server": {
          "ui": {
            "apollo_url": true
          }
        }
      },
      "env_vars": [],
      "system_information": {
        "platform": "Linux-5.8.0-1035-aws-x86_64-with-glibc2.29",
        "prefect_backend": "server",
        "prefect_version": "0.15.6",
        "python_version": "3.8.10"
      }
    }
    Kevin Kho

    Kevin Kho

    10 months ago
    Ok we had a related issue but it was fixed in 0.15.6. Last question I have, do you experience it with
    flow.run()
    or is it only when you run on Cloud?
    p

    Philip MacMenamin

    10 months ago
    Fails correctly via .run (ie fails to init and downstream tasks do not start
    Kevin Kho

    Kevin Kho

    10 months ago
    Thanks! Will work on this.
    p

    Philip MacMenamin

    10 months ago
    looks like I can force it to work if I explicitly set everything to be upstream_tasks.
    glb_fps = get_glb_fps(job, upstream_tasks=[chimera_ok, job, files, sanity_ok])
    Kevin Kho

    Kevin Kho

    10 months ago
    Ah thanks for circling back. That's weird though since the upstream task was already in the flow schematic.