I think I know what is happening but I’m guessing a bit. If your code is simple enough to share, I could try a Flow with a similar structure.
For the basic example above, the
ENDRUN
is causing the Flow to fail. In your case, I think what is happening is that you have other tasks not related to the task that raised the
ENDRUN
. These are your reference tasks that determine the success or failure of the flow. You can explicitly include the task that raises ENDRUN as a reference task like this:
from prefect import Flow, task, unmapped
import time
from prefect.engine.signals import ENDRUN, FAIL
from prefect.engine.state import Failed
@task
def aws_list_files(a):
time.sleep(1)
if a == 1:
raise ENDRUN(state=Failed())
return a
with Flow("xxx") as flow:
a = aws_list_files(1)
b = aws_list_files(2, upstream_tasks=[a])
c = aws_list_files(2)
flow.set_reference_tasks([a,c])
flow.run()
In this example, if I set my reference task to
c
only, the flow succeeds even if
a
and
b
fail. I think in your case, it’s a matter of explicitly making the
ENDRUN
task an
upstream_task
or setting it as a reference task.