https://prefect.io logo
Title
a

Alexandru Anghel

11/04/2022, 8:34 PM
Hi guys, What's the proper way of running a one time only schedule flow in Orion? I can't find the DatesClock class anymore in Orion. In Prefect 1.X i was using something like this to start the flow 10 seconds after the registration:
def get_schedule(schedule: str):
    return Schedule(clocks=[DatesClock([pendulum.now().add(seconds=10)])]) if schedule == "one time" else CronSchedule(cron=schedule)
Thanks!
1
m

Mason Menges

11/04/2022, 9:07 PM
Hey @Alexandru Anghel I'm not sure I follow exactly what you're wanting to accomplish here, i.e. what's the Purpose behind scheduling a flow to run only once? That said you could potentially use rrule schedules to do this
a

Alexandru Anghel

11/04/2022, 9:24 PM
Hey @Mason Menges, thanks for replying! I am loading some historical data using hard coded start and end dates, so i need some flows to run only once. I am looking now at rrules, never used it before.
m

Mason Menges

11/04/2022, 10:00 PM
You might find some use with this functionality for 2.0 then, https://github.com/PrefectHQ/prefect/blob/main/src/prefect/deployments.py#L40 run deployment allows you to specify the start time and parameters for a flow run deployment directly rather than through the schedule 😄
a

Alexandru Anghel

11/04/2022, 10:13 PM
Thanks, i wil have a look. I tried with rrules but i get this error:
Invalid RRule string "FREQ=DAILY;COUNT=1;DTSTART=20221104T220851Z": unknown parameter 'DTSTART' (type=value_error)
Edit: Nevermind, i've added by mistake dtstart in the rrule string
I don't think rrules are helpful for me. From what i understand it's good for repeated schedules, which is not my case.
One workaround would be something like this and hope i won't be around for the next run 😃
def get_schedule(schedule: str):
    dtstart = datetime.now() + timedelta(0,10)
    return IntervalSchedule(interval=timedelta(days=9999), anchor_date = dtstart, timezone='UTC') if schedule == "one time" else CronSchedule(cron=schedule, timezone='UTC')
Going back to your proposal of setting the parameters directly to the flow run, i'm not sure this will work for my use case. I'm using dask runner and relying heavily on flow templates. Finally, i'm declaring the flows through kubernetes manifests. It would be really helpful to be able to specify a particular timestamp directly to the schedule, like we used to have in Prefect 1.x.
n

Nate

11/05/2022, 12:35 AM
Hi @Alexandru Anghel You could use
run_deployment
in your CI/CD (which is not a task so it doesn't need to be in a flow) to trigger a given deployment some amount of time after the deployment is applied
from prefect.deployments import run_deployment

...
my_deployment.apply()

sleep(10)

run_deployment(name="myDeployment", params=dict(...))
a

Alexandru Anghel

11/07/2022, 11:11 AM
Thank you, @Nate! This is indeed a better solution.
The only downside of this is that the pod that launches the deployment is staying in Running state until the dask pods complete their tasks.
n

Nate

11/07/2022, 2:54 PM
You can pass
run_deployment(name="xxx/xxx", timeout=0)
to avoid waiting for the referenced deployment to run
🙌 1