Hello Everyone, does anybody know how to run a flo...
# ask-community
r
Hello Everyone, does anybody know how to run a flow every 15 mins except if it is already running?
k
Hey @Ross Timm, you can use Flow Concurrency on the standard tier so that the following run would be delayed until be previous one completes. If you just want it to exit, you would need to put a first task in the Flow that queries the GraphQL API, and if there is a running flow then raise the
ENDRUN
signal.
r
Thank you Kevin!
@Kevin Kho Hey Kevin if I raise
ENDRUN
signal in the task it just cancels the task and has a successful state for the flow. So if I want to cancel the task AND cancel the flow what's the best accomplish that
k
Will look into this.
r
Thanks
k
Are you using it like this?
Copy code
from prefect import Flow, task, unmapped
import time
from prefect.engine.signals import ENDRUN, FAIL
from prefect.engine.state import Failed

@task
def aws_list_files(a):
    if a == 1:
        raise ENDRUN(state=Failed())
    return a

with Flow("xxx") as flow:
    aws_list_files(1)

flow.run()
r
Yep basically that but instead of the state
Failed()
I do
Cancelled()
k
I see what you mean. I’ll test it.
I think I know what is happening but I’m guessing a bit. If your code is simple enough to share, I could try a Flow with a similar structure. For the basic example above, the
ENDRUN
is causing the Flow to fail. In your case, I think what is happening is that you have other tasks not related to the task that raised the
ENDRUN
. These are your reference tasks that determine the success or failure of the flow. You can explicitly include the task that raises ENDRUN as a reference task like this:
Copy code
from prefect import Flow, task, unmapped
import time
from prefect.engine.signals import ENDRUN, FAIL
from prefect.engine.state import Failed

@task
def aws_list_files(a):
    time.sleep(1)
    if a == 1:
        raise ENDRUN(state=Failed())
    return a

with Flow("xxx") as flow:
    a = aws_list_files(1)
    b = aws_list_files(2, upstream_tasks=[a])
    c = aws_list_files(2)

flow.set_reference_tasks([a,c])
flow.run()
In this example, if I set my reference task to
c
only, the flow succeeds even if
a
and
b
fail. I think in your case, it’s a matter of explicitly making the
ENDRUN
task an
upstream_task
or setting it as a reference task.
I’ll ask the team if
ENDRUN
should stop all processing whatsoever. I’m a bit unclear on this.
r
Ok thank you, so you are saying if it is marked as a reference task and cancels that it should in theory cancel or fail the flow
k
Yes, the flow will be marked as failed if it’s a reference task, but I am a bit confused why
c
still runs even if the
ENDRUN
signal was raised so I’ll ask the team about that part
r
Ok, great because it's just helpful from like a visual cue to see the flow canceled instead of saying successful then having to go check it in the logs
k
Yep pretty sure the reference task will make this work for the visual cue, but just ping me if it still doesn’t.
r
Hey Kevin I just set that particular task as a reference task, still not getting the desired visual cue/flow run result
k
Is your Flow simple enough to share?
Also I got confirmation from the team that
ENDRUN
only ends the current task but not the flow run. The docs will be updated to reflect this.
r
Could I put like a state handler on that task and have it cancel the flow with that?
k
For the visual cue, a state handler can certainly be the answer (but I do think we’re just a tiny bit off before we have to go there). For a way to end the Flow process entirely, this can’t be done on the task state handler level. You can also just
raise FAIL
explicitly instead of
ENDRUN
, but I think the issue that the task is failing but the terminal tasks are succeeding.
We could address this with the terminal state handler, but I think the goal is to not have things run at all? Addressing it there would mean that we didn’t stop them from running. You can DM me the Flow if you dont want to share it publicly. (Just leave the sensitive info out). You can also give me a schematic?