Nathan Atkins
06/01/2021, 10:11 PMafter
into the schedule as part of flow.run()
. I have hacked flow._run()
to support this.
2. Something past my Python knowledge is causing CronClock.events()
to do something weird when the yield returns. This causes the execution to drop directly out of the while loop and exit the method. Each new call to schedule.next()
winds up creating a new croniter
and running the same event start_date again.
If I build the clock directly and call it’s next on the iterable returned events()
it works as I would expect.Nathan Atkins
06/01/2021, 10:13 PMimport pendulum
from prefect.schedules.clocks import CronClock
clock = CronClock(cron="*/15 * * * *", start_date=pendulum.datetime(2021, 4, 30))
after = pendulum.datetime(2021, 5, 30, 23, 30, tz="UTC")
for i, e in enumerate(clock.events(after=after)):
print(e.start_time)
if e > pendulum.now():
break
2021-05-30T234500+00:00
2021-05-31T000000+00:00
…
2021-06-01T220000+00:00
2021-06-01T221500+00:00Nathan Atkins
06/01/2021, 10:17 PMdef _run(
self,
parameters: Dict[str, Any],
runner_cls: type,
run_on_schedule: bool = True,
**kwargs: Any,
) -> "prefect.engine.state.State":
after = kwargs.pop("schedule_after", None)
...
# determine time of first run
try:
if run_on_schedule and self.schedule is not None:
next_run_event = self.schedule.next(1, return_events=True, after=after)[0]
next_run_time = next_run_event.start_time # type: ignore
parameters = base_parameters.copy()
parameters.update(next_run_event.parameter_defaults) # type: ignore
else:
next_run_time = pendulum.now("utc")
except IndexError:
raise ValueError("Flow has no more scheduled runs.") from None
...
try:
if run_on_schedule and self.schedule is not None:
next_run_event = self.schedule.next(1, return_events=True, after=after)[0]
next_run_time = next_run_event.start_time # type: ignore
parameters = base_parameters.copy()
parameters.update(next_run_event.parameter_defaults) # type: ignore
else:
break
except IndexError:
# Handle when there are no more events on schedule
break
Pass schedule_after
as an argument to flow.run()
, extract it from kwargs in _run() and pass it to schedule.events()
.Nathan Atkins
06/01/2021, 10:20 PMyield
statement with some try/exception/finally blocks
try:
while True:
next_date = pendulum.instance(cron.get_next(datetime))
# because of croniter's rounding behavior, we want to avoid
# issuing the after date; we also want to avoid duplicates caused by
# DST boundary issues
if next_date.in_tz("UTC") == after.in_tz("UTC") or next_date in dates:
next_date = pendulum.instance(cron.get_next(datetime))
if self.end_date and next_date > self.end_date:
break
dates.add(next_date)
try:
yield ClockEvent(
start_time=next_date,
parameter_defaults=self.parameter_defaults,
labels=self.labels,
)
finally:
print("----> Inner finally")
print("----> exit yield")
print("----> exit events")
except Exception as exc:
print("----> exception")
finally:
print("----> finally")
I get the following output which doesn’t look like it is just exiting the while True
look after the yield.
----> Enter events
----> Inner finally
----> finally
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'Hello'
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Scheduled start time: 2021-05-30T23:31:00+00:00
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
----> Enter events
----> Inner finally
----> finally
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'Hello'
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Scheduled start time: 2021-05-30T23:31:00+00:00
[2021-06-01 18:19:50-0400] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-01 18:19:50-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Chris White
flow.run
is considered a local testing interface, and as such we don’t plan to expand upon the scheduling interface of this particular method call
Combining these two comments, the only thing that running an “old” run does is set the scheduled_start_time
of the run in prefect.context
to an older date (note: this means that for a backfill run to be meaningful, you need to be relying on this attribute in your task logic somehow). The simplest way to achieve this is to set all your task contexts to have an old date for this attribute:
for old_date in old_dates:
task_ctxs = dict.fromkeys(flow.tasks, {"scheduled_start_time": old_date})
flow.run(run_on_schedule=False, task_contexts=task_ctxs)
Nathan Atkins
06/02/2021, 1:44 PMNathan Atkins
06/02/2021, 2:31 PM