Hi there! I have a shared task that I would like t...
# ask-community
b
Hi there! I have a shared task that I would like to run from a bunch of different flows. The only differences between the flows are the flow names, the schedules, plus a single parameter. Is this a reasonable way to accomplish this?
Copy code
from prefect import Flow, Parameter, task
from prefect.schedules import CronSchedule


@task
def my_task(my_parameter):
    print(f"my_parameter value is {my_parameter}")


with Flow("my-flow-1", schedule=CronSchedule("0 * * * *")) as flow1:
    param = Parameter("my_parameter", default="flow-1-parameter-value")
    my_task(param)

with Flow("my-flow-2", schedule=CronSchedule("1 * * * *")) as flow2:
    param = Parameter("my_parameter", default="flow-2-parameter-value")
    my_task(param)
k
Hey @Billy McMonagle, a schedule can be built with multiple clocks, and a Clock can take in parameters and labels. So one potential way is just having one flow with a schedule with multiple clocks. This won’t let you have different flow names. Other than that, you can just create a function that takes in the schedule, flow_name, and param and then register it.
Copy code
def register_flow(schedule, name, param):
    with Flow("my-flow-1", schedule=CronSchedule("0 * * * *")) as flow1:
        param = Parameter("my_parameter", default="flow-1-parameter-value")
        my_task(param)
    flow.register(project)
Then you can just loop through the combinations and call this function right? You can also return the Flow object instead of registering after the with block.
b
Hi @Kevin Kho yes that makes sense. I will probably setup another module or configuration file that has the flow name, schedule, and parameter value, and then have a "flow factory" like you suggest. For now, all flows will have the same runconfig, storage, etc. on this project. Thank you!
👍 1
l
Hi @Kevin Kho thank you very much for the register flow example, it was exactly what I was looking for. Since I was used to creating multiple airflow DAGS through a for loop, I was looking for something similar here but wasn't able to figure it out with flow.run(), but had to use flow.register() it seems. And if I understand it correctly, the flow.run() is unnecessary when using a scheduler since it'll be ran by the scheduler instead.
k
Yes
flow.run()
is just for local testing