Hi guys, I am using prefect scheduler, for flow to...
# ask-community
m
Hi guys, I am using prefect scheduler, for flow to repeat. Unfortunatelly, our task last longer than scheduler and that means that some things messes up. Is it possibility to set scheduler after previous successfull flow rerun same flow again and again.
πŸ‘‹πŸ½ 1
πŸ‘‹ 1
a
Hi @Martin Durkac! Kevin has already given a good answer in this thread. In short: if you use Prefect Cloud and you are on a Standard or Enterprise plan, you can set the flow concurrency limit to 1 for this specific flow. This will ensure that only one FlowRun is active at any time. Any new scheduled flow run will be queued. Optionally, you can combine this with Automations to cancel late runs.
m
@Anna Geller thank you for quick response! Maybe I should add that my prefect runs on prefect-server not on Prefect Cloud. So it means that in server is not possible to run concurrent flow?
a
Concurrency limits is a Cloud-only feature. Alternatively, for Prefect Server, you could create a Flow-level state handler that creates a new flow run if the old state is Running and the new state is Finished. This would have an effect of a never-ending Flow because any time your FlowRun reaches a
Finished
state, there will be a new FlowRun created.
Or perhaps it might be better for you if this Flow-level state handler acts on the new state being
Success
to avoid creating new flow runs if something is wrong in the actual flow logic.
m
Thank you πŸ™‚ I need to figure it out how to do it in prefect
a
This can be a good start:
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run
import time


@task(log_stdout=True)
def hello_world():
    print("Sleeping...")
    time.sleep(4)  # to have enough time to kill it
    return "hello world"


def never_ending_state_handler(obj, old_state, new_state):
    if old_state.is_running() and new_state.is_successful():
        create_flow_run.run(flow_name="never-ending-flow", project_name="community")
    return new_state


with Flow("never-ending-flow", state_handlers=[never_ending_state_handler]) as flow:
    hello_task = hello_world()
πŸ‘ 1
upvote 1
πŸ‘€ 1
k
Maybe this is intentional, but just be careful that this can cause an infinite loop.
m
Hi again, can I ask you what version of prefect I need to install because my version 0.14.21 throws me ImportError: cannot import name 'create_flow_run' from 'prefect.tasks.prefect'
a
@Martin Durkac you’re right - this task was added as part of 0.15.0 release. There is even a blog post explaining all new things in this release.
m
@Anna Geller thank you very much maybe this was problem from all my previous atempts old version of prefect πŸ€”
πŸ‘ 1