Sam Garvis
09/21/2022, 6:56 PMAlix Cook
09/21/2022, 7:10 PMimport asyncio
from uuid import UUID
from django.conf import settings
from prefect import task, get_run_logger
from prefect.context import get_run_context
from prefect.client import OrionClient, get_client
from prefect.orion.schemas.states import Scheduled
from datetime import datetime, timedelta
async def __reschedule_flow(flow_run_id: UUID):
client: OrionClient
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
for tag in flow_run.tags:
if tag == "schedule:disabled":
# if scheduling is disabled for this flow, don't run it
return
logger = get_run_logger()
<http://logger.info|logger.info>("Scheduling new flow run for realtime flow")
state = Scheduled(
scheduled_time=datetime.now() + timedelta(seconds=settings.PREFECT_REALTIME_DELAY_SECONDS),
)
await client.create_flow_run_from_deployment(
deployment_id=flow_run.deployment_id,
state=state,
)
@task(retries=5, retry_delay_seconds=5)
def schedule_flow():
run_ctx = get_run_context()
asyncio.run(__reschedule_flow(run_ctx.task_run.flow_run_id))
looks soemthing like this if you just put it in task form (sorry its a sync calling out to async, i just needed to do this for some not great reasons)def realtime_flow(fn):
@functools.wraps(fn)
def decorated(*args, **kwargs):
try:
ret = fn(*args, **kwargs)
except Exception:
schedule_flow()
raise
schedule_flow()
return ret
return decorated
then your flow could looks something like :
@flow
@realtime_flow
def my_cool_flow():
pass
Cole Murray
09/21/2022, 7:56 PMSam Garvis
09/21/2022, 8:24 PMFaheem Khan
09/21/2022, 11:00 PM