CA Lee
08/30/2021, 7:08 AMCA Lee
08/30/2021, 7:12 AMdaily_9am
, daily_6pm
, weekly_12pm
, and copy paste the boilerplate code for 3 scripts, while just changing the CronSchedule
expression
from funcs import \
func_one,
func_two
import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow, Parameter
from prefect.schedules import CronSchedule
logger = prefect.context.get('logger')
daily_9am = CronSchedule("0 1 * * 6") << Changed this line of code only for 3 files
S3_STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
ECS_RUN_CONFIG = ECSRun(image="<http://xxxxxxxxxxx.dkr.ecr.xxxxxxx.amazonaws.com/xxxxxxxx:latest|xxxxxxxxxxx.dkr.ecr.xxxxxxx.amazonaws.com/xxxxxxxx:latest>", labels=['ecs'])
FLOW_NAME = "Reports"
reports = {
'func_one': func_one,
'func_two': func_two
}
@task()
def report(report_name):
report_name.run()
<http://logger.info|logger.info>(f"Executed report {report_name}.")
with Flow(
FLOW_NAME,
storage=S3_STORAGE,
run_config=ECS_RUN_CONFIG,
schedule=daily_9am
) as flow:
report_number = Parameter('report_number', default=list(reports.keys()))
report_name = task(lambda x: reports[x]).map(report_number)
report.map(report_name)
CA Lee
08/30/2021, 7:20 AMCronSchedules
based on the name of functions passed in?
daily_9am = CronSchedule("0 9 * * *")
daily_6pm = CronSchedule("0 18 * * *")
weekly_12pm = CronSchedule("0 12 * * 6")
def scheduler(func):
if func == func_one:
return daily_9am
elif func == func_two:
return daily_6pm
elif func == func_three:
return weekly_12pm
But how would I fit this into my Prefect Flow?
with Flow(..., schedule=scheduler(func)) as flow:
...
Bouke Krom
08/30/2021, 7:53 AMschedules
(with different parameters if you like) for a flow_group
. In the UI you can go to the flow, then settings (top right) > schedules, and click 'new schedules' as often as you like.CA Lee
08/30/2021, 9:26 AMCA Lee
08/30/2021, 9:27 AMBen Muller
08/30/2021, 9:31 AMBen Muller
08/30/2021, 9:32 AMBouke Krom
08/30/2021, 12:04 PMFlow
has a (default) Schedule
which can have one or more `Clock`s: https://docs.prefect.io/api/0.14.22/schedules/schedules.html#scheduleBouke Krom
08/30/2021, 12:07 PMdaily_9am = CronSchedule("0 1 * * 6")
could be something like
my_flow_schedule = Schedule(clocks=[
CronClock("0 9 * * *"),
CronClock("0 18 * * *"),
CronClock("0 12 * * 6")
])
Kevin Kho