Hey everyone I have a flow that had required param...
# ask-community
t
Hey everyone I have a flow that had required parameters. And I have a problem... I want to be able to programatically add schedule clocks with default parameters. I am able to do it through UI, however when it comes to the graphql solution ,[https://docs.prefect.io/orchestration/flow-runs/scheduling.html#creating-flow-schedules ] the biggest obstruction is that Setting flow group schedules will remove any existing schedules. I would like to have
Copy code
add_flow_group_schedule
instead of just
Copy code
set_flow_group_schedule
(Just like we have in the UI ~ The New Schedule button) -> How does it manage to do it ?
a
@Tilak Maddy that’s correct, the mutation
set_flow_group_schedule
can be interpreted as create or replace a flow group schedule. This means that if you want to only add new clocks, you would have to first query for the existing clocks:
Copy code
query {
  flow_group (where: {id: {_eq: "b7075fcf-3cc4-4d7e-9978-40176417394e"}}) {
    schedule
    default_parameters
  }
}
And then modify your mutation based on that output and run the
set_flow_group_schedule
. A more practical way of modifying clocks with
parameter_defaults
attached to it would be to do it directly in your flow configuration and re-register your flow, e.g.:
Copy code
import datetime
import prefect
from prefect import task, Flow, Parameter
from prefect.schedules import clocks, Schedule

now = datetime.datetime.utcnow()

clock1 = clocks.IntervalClock(start_date=now, 
                                interval=datetime.timedelta(minutes=1), 
                                parameter_defaults={"p": "CLOCK 1"})
clock2 = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30), 
                                interval=datetime.timedelta(minutes=1), 
                                parameter_defaults={"p": "CLOCK 2"})

@task
def log_param(p):
    logger = prefect.context['logger']
    <http://logger.info|logger.info>("Received parameter value {}".format(p))
    
    
with Flow("Varying Parameters", schedule = Schedule(clocks=[clock1, clock2])) as flow:
	p = Parameter("p", default=None, required=False)
    log_param(p)
👍 1
t
Hey @Anna Geller I understood that, now I'd like to know how I can make the following query. • Fetch the start_time, end_time and state of some of the recent flow runs generated by a particular schedule clock (given that no 2 schedule clocks will have the same paramter_defaults) I did find this resource - https://docs.prefect.io/orchestration/flow-runs/inspection.html#querying-for-a-single-flow-run However I can't figure out how to keep query all the flow runs' IDs generated by a particular clock in order to actually query the single flow run
k
I don’t believe this can be done because clocks don’t have any identifier. If they don’t have parameter default overlaps, I think the thing to do here is to query by the parameters of the Flow Run instead
Copy code
from prefect import Client

client = Client()
response = client.graphql("""
        query($params: jsonb){
        flow_run(where: {parameters: {_contains: $params}}){
            name
            id
            parameters
            } 
        }
     """,
    variables={"params": {"x": 1}})

print(response)
upvote 2
t
Thanks, Will try it
Hey guys @Kevin Kho @Anna Geller one more thing - when it comes to adding a schedule, I am afraid I can't scale the solution . Because I plan to add a schedule clock with default_params everytime a user on our website creates a ETL pipeline. So then I'd have to download all the existing schedules into my server and then add another schedule and then resend them back. What happens when we get to like 100 pipelines ? or then maybe 1000 ? 10000 ?
k
Beyond the parameter discussion, there is a limitation in that a single Flow can only schedule a maximum of 10 concurrent runs. Check this thread. He has a 13 clocks with different parameters on 1 Flow. Only 10 get scheduled and 3 are dropped. There is an explanation there but the quick answer is Prefect runs the schedule on a 150 second interval and it scheduled a maximum of 10 runs per Flow. So the moment you have more than 10 scheduled runs in a 150 second window, you will start to see Flow runs not triggered. So attaching 1000 clocks or 10000 clocks is really not going to work even if you had a good interface. You really need to register new flows with this new parameter sets
t
You really need to register new flows with this new parameter sets
But since the flow code is going to be the exact same thing, what would you suggest I do ? I use Github storage and Vertex Agent . For now I only have one main flow file (obviously with just one name) that lies as 'storage' in the repo and the execution environment is a docker file that vertex agent pulls from the docker hub
How do I register multiple flows ?
k
I am only half sure this will work but I think you can try: 1. Create a function to build a Flow 2. Make this function take in default params 3. Register the flow with the default params 4. At this point, there will be a difference of Github stored code and a newly registered Flow with different params. It might be ok as long as it’s the just param changing and the Flow structure remains the same. Can try on Monday
If this doesn’t work, I would just change to GCS storage and have a copy uploaded during registration because with Github Storage anyway, you’re still going to have to register all the flows each time your make a difference so it doesn’t really change the effort level of maintenance to move to GCS storage. In both cases you will change the file in Github and re-register all the flows for the changes to take effect
t
Will Prefect offer first class support for this kind of situation in the near future ?
a
@Tilak Maddy are you asking whether Prefect will support adding more than 10 clocks to a single flow, or adding/appending new clocks to a list of clocks in general? Feel free to open a Github issue for this so that engineers can give their opinion whether this is something doable in current Prefect. I think what you try to do will be easier to accomplish in Orion because you could create a separate deployment for each “clock” with specific parameter values. As a workaround, I think you could leverage the
create_flow_run
task in a parent flow. So you would register your user’s pipeline as a normal independent flow first, but any time the user wants to add a new pipeline (aka the same flow but just with different parameters), instead of modifying the existing flow or appending clocks, you would instead register a new parent flow that triggers this child flow with different parameters and scheduled it to run at a specific time. This would have the effect you want without any concurrency issues and without modifying the clocks. And as to registering multiple flows, you can use “prefect register” CLI to do that.
k
Likely not for first-class support. This number of 10 is not about concurrent clocks necessarily. If you have a Flow that runs everyday, the maximum of 10 Flows refers to how many are scheduled at a time. So this means you will have 10 Flow runs scheduled in the future. So if we increase this number from 10 to 20, we would double the number of scheduled Flows for every registered Flow in Prefect across all users.