Hi, is there any way to register a single Prefect ...
# ask-community
c
Hi, is there any way to register a single Prefect flow to run different functions on different schedules? (details in thread)
For example, I am looking to run reports at different frequencies: • Daily at 9am • Daily at 6pm • Weekly at 12pm I have all the boilerplate code written that tells Prefect how to execute my code (using ECS / S3). For the above 3 schedules, I need to create 3 files
daily_9am
,
daily_6pm
,
weekly_12pm
, and copy paste the boilerplate code for 3 scripts, while just changing the
CronSchedule
expression
Copy code
from funcs import \
    func_one,
    func_two

import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow, Parameter
from prefect.schedules import CronSchedule

logger = prefect.context.get('logger')
daily_9am = CronSchedule("0 1 * * 6") << Changed this line of code only for 3 files

S3_STORAGE = S3(bucket="aws-ecs-flows", stored_as_script=True)
ECS_RUN_CONFIG = ECSRun(image="<http://xxxxxxxxxxx.dkr.ecr.xxxxxxx.amazonaws.com/xxxxxxxx:latest|xxxxxxxxxxx.dkr.ecr.xxxxxxx.amazonaws.com/xxxxxxxx:latest>", labels=['ecs'])

FLOW_NAME = "Reports"

reports = {
    'func_one': func_one,
    'func_two': func_two
}

@task()
def report(report_name):
    report_name.run()
    <http://logger.info|logger.info>(f"Executed report {report_name}.")

with Flow(
    FLOW_NAME,
    storage=S3_STORAGE,
    run_config=ECS_RUN_CONFIG,
    schedule=daily_9am
) as flow:

    report_number = Parameter('report_number', default=list(reports.keys()))
    report_name = task(lambda x: reports[x]).map(report_number)

    report.map(report_name)
Is there any way to use a single flow to run multiple schedules? Perhaps a function that returns different
CronSchedules
based on the name of functions passed in?
Copy code
daily_9am = CronSchedule("0 9 * * *")
daily_6pm = CronSchedule("0 18 * * *")
weekly_12pm = CronSchedule("0 12 * * 6")

def scheduler(func):
  if func == func_one:
    return daily_9am
  elif func == func_two:
    return daily_6pm
  elif func == func_three:
    return weekly_12pm
But how would I fit this into my Prefect Flow?
Copy code
with Flow(..., schedule=scheduler(func)) as flow:
  ...
b
Hi CA, I'm not sure about the Python API, but we use this via the GraphQL interface: you can set a list of
schedules
(with different parameters if you like) for a
flow_group
. In the UI you can go to the flow, then settings (top right) > schedules, and click 'new schedules' as often as you like.
c
Ahh yes, I can see how the UI option makes sense ! Appreciate if anyone has any insight on how to write this as Python code ?
I can click on New Schedule here in the UI, but I'd like to be explicit about my intent in my code as well
b
You can add multiple clocks to a single flow.
And each clock can have alternative parameters
b
I think in the Python API the semantics are: a
Flow
has a (default)
Schedule
which can have one or more `Clock`s: https://docs.prefect.io/api/0.14.22/schedules/schedules.html#schedule
So
daily_9am = CronSchedule("0 1 * * 6")
could be something like
Copy code
my_flow_schedule = Schedule(clocks=[
    CronClock("0 9 * * *"), 
    CronClock("0 18 * * *"),
    CronClock("0 12 * * 6")
])
k
Yes this is right. You can use a Schedule that is composed of multiple clocks, and each of those clocks could hold parameters that use the appropriate function to generate a report. Parameters only take JSON so you would need to pass in a string, and than have a task return the appropriate function.