Catch Up CronClock: I wanted to use the CronClock...
# ask-community
n
Catch Up CronClock: I wanted to use the CronClock to run a flow at the same time every day. I also want it to catch up if the start date and after date are before today. start_date=5/25/2021, after_date=5/30/2021 and today=6/2/2021. I would get a schedule to run on 5/31, 6/1 and 6/2. These would all be processed ASAP and then the flow would wait until 6/3 to run again. I’m having 2 problems with this. 1. I don’t see how to pass
after
into the schedule as part of
flow.run()
. I have hacked
flow._run()
to support this. 2. Something past my Python knowledge is causing
CronClock.events()
to do something weird when the yield returns. This causes the execution to drop directly out of the while loop and exit the method. Each new call to
schedule.next()
winds up creating a new
croniter
and running the same event start_date again. If I build the clock directly and call it’s next on the iterable returned
events()
it works as I would expect.
This works as expected:
Copy code
import pendulum
from prefect.schedules.clocks import CronClock

clock = CronClock(cron="*/15 * * * *", start_date=pendulum.datetime(2021, 4, 30))
after = pendulum.datetime(2021, 5, 30, 23, 30, tz="UTC")
for i, e in enumerate(clock.events(after=after)):
    print(e.start_time)
    if e > pendulum.now():
        break
2021-05-30T234500+00:00 2021-05-31T000000+00:00 … 2021-06-01T220000+00:00 2021-06-01T221500+00:00
Hacks to flow._run()
Copy code
def _run(
        self,
        parameters: Dict[str, Any],
        runner_cls: type,
        run_on_schedule: bool = True,
        **kwargs: Any,
    ) -> "prefect.engine.state.State":
        after = kwargs.pop("schedule_after", None)
...
        # determine time of first run
        try:
            if run_on_schedule and self.schedule is not None:
                next_run_event = self.schedule.next(1, return_events=True, after=after)[0]
                next_run_time = next_run_event.start_time  # type: ignore
                parameters = base_parameters.copy()
                parameters.update(next_run_event.parameter_defaults)  # type: ignore
            else:
                next_run_time = pendulum.now("utc")
        except IndexError:
            raise ValueError("Flow has no more scheduled runs.") from None
...
            try:
                if run_on_schedule and self.schedule is not None:
                    next_run_event = self.schedule.next(1, return_events=True, after=after)[0]
                    next_run_time = next_run_event.start_time  # type: ignore
                    parameters = base_parameters.copy()
                    parameters.update(next_run_event.parameter_defaults)  # type: ignore
                else:
                    break
            except IndexError:
                # Handle when there are no more events on schedule
                break
Pass
schedule_after
as an argument to
flow.run()
, extract it from kwargs in _run() and pass it to
schedule.events()
.
When I run the flow for real. If I look to see what is executed after the
yield
statement with some try/exception/finally blocks
Copy code
try:
            while True:
                next_date = pendulum.instance(cron.get_next(datetime))
                # because of croniter's rounding behavior, we want to avoid
                # issuing the after date; we also want to avoid duplicates caused by
                # DST boundary issues
                if next_date.in_tz("UTC") == after.in_tz("UTC") or next_date in dates:
                    next_date = pendulum.instance(cron.get_next(datetime))

                if self.end_date and next_date > self.end_date:
                    break
                dates.add(next_date)
                try:
                    yield ClockEvent(
                        start_time=next_date,
                        parameter_defaults=self.parameter_defaults,
                        labels=self.labels,
                    )
                finally:
                    print("----> Inner finally")
                print("----> exit yield")
            print("----> exit events")
        except Exception as exc:
            print("----> exception")
        finally:
            print("----> finally")
I get the following output which doesn’t look like it is just exiting the
while True
look after the yield.
Copy code
----> Enter events
----> Inner finally
----> finally
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'Hello'
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Scheduled start time: 2021-05-30T23:31:00+00:00
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
----> Enter events
----> Inner finally
----> finally
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'Hello'
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Scheduled start time: 2021-05-30T23:31:00+00:00
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
c
Hey Nathan — two notes: • at the moment, backfills like you are running here fall under the category of “ad hoc runs” in Prefect •
flow.run
is considered a local testing interface, and as such we don’t plan to expand upon the scheduling interface of this particular method call Combining these two comments, the only thing that running an “old” run does is set the
scheduled_start_time
of the run in
prefect.context
to an older date (note: this means that for a backfill run to be meaningful, you need to be relying on this attribute in your task logic somehow). The simplest way to achieve this is to set all your task contexts to have an old date for this attribute:
Copy code
for old_date in old_dates:
    task_ctxs = dict.fromkeys(flow.tasks, {"scheduled_start_time": old_date})
    flow.run(run_on_schedule=False, task_contexts=task_ctxs)
n
Thanks Chris, I had found the scheduled_start_time in the context. I’ll run this from the UI and rethink the backfill as ad hoc. Any Python insights as to why the yield statement exits the method directly?
Never mind, a walk to the grocery gave me time to sort it out in my mind.
👍 1