Thread
#prefect-community
    Billy McMonagle

    Billy McMonagle

    1 year ago
    Hi there! I have a shared task that I would like to run from a bunch of different flows. The only differences between the flows are the flow names, the schedules, plus a single parameter. Is this a reasonable way to accomplish this?
    from prefect import Flow, Parameter, task
    from prefect.schedules import CronSchedule
    
    
    @task
    def my_task(my_parameter):
        print(f"my_parameter value is {my_parameter}")
    
    
    with Flow("my-flow-1", schedule=CronSchedule("0 * * * *")) as flow1:
        param = Parameter("my_parameter", default="flow-1-parameter-value")
        my_task(param)
    
    with Flow("my-flow-2", schedule=CronSchedule("1 * * * *")) as flow2:
        param = Parameter("my_parameter", default="flow-2-parameter-value")
        my_task(param)
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Billy McMonagle, a schedule can be built with multiple clocks, and a Clock can take in parameters and labels. So one potential way is just having one flow with a schedule with multiple clocks. This won’t let you have different flow names. Other than that, you can just create a function that takes in the schedule, flow_name, and param and then register it.
    def register_flow(schedule, name, param):
        with Flow("my-flow-1", schedule=CronSchedule("0 * * * *")) as flow1:
            param = Parameter("my_parameter", default="flow-1-parameter-value")
            my_task(param)
        flow.register(project)
    Then you can just loop through the combinations and call this function right? You can also return the Flow object instead of registering after the with block.
    Billy McMonagle

    Billy McMonagle

    1 year ago
    Hi @Kevin Kho yes that makes sense. I will probably setup another module or configuration file that has the flow name, schedule, and parameter value, and then have a "flow factory" like you suggest. For now, all flows will have the same runconfig, storage, etc. on this project. Thank you!
    l

    Luuk

    1 year ago
    Hi @Kevin Kho thank you very much for the register flow example, it was exactly what I was looking for. Since I was used to creating multiple airflow DAGS through a for loop, I was looking for something similar here but wasn't able to figure it out with flow.run(), but had to use flow.register() it seems. And if I understand it correctly, the flow.run() is unnecessary when using a scheduler since it'll be ran by the scheduler instead.
    Kevin Kho

    Kevin Kho

    1 year ago
    Yes
    flow.run()
    is just for local testing