https://prefect.io logo
#prefect-community
Title
# prefect-community
s

Sam Garvis

09/21/2022, 6:56 PM
Is there a best method for running a flow as frequently as possible? I tried putting the flow in while true, like I read in a prefect article. But every single time, after a few hours I get some internal 500 error and it shuts down the flow. I would prefer to not run an entirely new flow frequently because it takes more time to set up the flow, but this might be the best option.
And then, I'm afraid to put it on a cron for every minute because sometimes it could possibly take longer and I want to avoid late flows. The flow itself takes around 11 seconds though
For prefect 2.0
a

Alix Cook

09/21/2022, 7:10 PM
I implemented a decorator for my flows that kicks off the flow again once the flow completes (whether it errored or not)
Copy code
import asyncio
from uuid import UUID

from django.conf import settings
from prefect import task, get_run_logger
from prefect.context import get_run_context
from prefect.client import OrionClient, get_client
from prefect.orion.schemas.states import Scheduled
from datetime import datetime, timedelta


async def __reschedule_flow(flow_run_id: UUID):
    client: OrionClient
    async with get_client() as client:
        flow_run = await client.read_flow_run(flow_run_id)
        for tag in flow_run.tags:
            if tag == "schedule:disabled":
                # if scheduling is disabled for this flow, don't run it
                return
        logger = get_run_logger()
        <http://logger.info|logger.info>("Scheduling new flow run for realtime flow")
        state = Scheduled(
            scheduled_time=datetime.now() + timedelta(seconds=settings.PREFECT_REALTIME_DELAY_SECONDS),
        )
        await client.create_flow_run_from_deployment(
            deployment_id=flow_run.deployment_id,
            state=state,
        )


@task(retries=5, retry_delay_seconds=5)
def schedule_flow():
    run_ctx = get_run_context()
    asyncio.run(__reschedule_flow(run_ctx.task_run.flow_run_id))
looks soemthing like this if you just put it in task form (sorry its a sync calling out to async, i just needed to do this for some not great reasons)
and if you want to use it as a decorator:
Copy code
def realtime_flow(fn):


    @functools.wraps(fn)
    def decorated(*args, **kwargs):
        try:
            ret = fn(*args, **kwargs)
        except Exception:
            schedule_flow()
            raise
        schedule_flow()
        return ret

    return decorated
then your flow could looks something like :
Copy code
@flow
@realtime_flow
def my_cool_flow():
   pass
c

Cole Murray

09/21/2022, 7:56 PM
Can you add additional context on the use-case?
s

Sam Garvis

09/21/2022, 8:24 PM
@Cole Murray Not sure if ur asking me or Alix. But need to catch a type of error ASAP, like easily under 1 minute and hopefully within 30 seconds. So constantly running this flow would make this possible, but I can't find a way that actually works
f

Faheem Khan

09/21/2022, 11:00 PM
@Sam Garvis I had the same issue with local server, using dask task runner and prefect 2.4, then I downgraded to prefect 2.04 and everything runs fine since.