Tilak Maddy
12/11/2021, 11:34 AMadd_flow_group_schedule
instead of just
set_flow_group_schedule
(Just like we have in the UI ~ The New Schedule button) -> How does it manage to do it ?Anna Geller
set_flow_group_schedule
can be interpreted as create or replace a flow group schedule. This means that if you want to only add new clocks, you would have to first query for the existing clocks:
query {
flow_group (where: {id: {_eq: "b7075fcf-3cc4-4d7e-9978-40176417394e"}}) {
schedule
default_parameters
}
}
And then modify your mutation based on that output and run the set_flow_group_schedule
.
A more practical way of modifying clocks with parameter_defaults
attached to it would be to do it directly in your flow configuration and re-register your flow, e.g.:
import datetime
import prefect
from prefect import task, Flow, Parameter
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 1"})
clock2 = clocks.IntervalClock(start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 2"})
@task
def log_param(p):
logger = prefect.context['logger']
<http://logger.info|logger.info>("Received parameter value {}".format(p))
with Flow("Varying Parameters", schedule = Schedule(clocks=[clock1, clock2])) as flow:
p = Parameter("p", default=None, required=False)
log_param(p)
Tilak Maddy
12/11/2021, 1:37 PMKevin Kho
Kevin Kho
from prefect import Client
client = Client()
response = client.graphql("""
query($params: jsonb){
flow_run(where: {parameters: {_contains: $params}}){
name
id
parameters
}
}
""",
variables={"params": {"x": 1}})
print(response)
Tilak Maddy
12/11/2021, 9:24 PMTilak Maddy
12/11/2021, 9:34 PMKevin Kho
Tilak Maddy
12/11/2021, 10:28 PMYou really need to register new flows with this new parameter setsBut since the flow code is going to be the exact same thing, what would you suggest I do ? I use Github storage and Vertex Agent . For now I only have one main flow file (obviously with just one name) that lies as 'storage' in the repo and the execution environment is a docker file that vertex agent pulls from the docker hub
Tilak Maddy
12/11/2021, 10:29 PMKevin Kho
Kevin Kho
Tilak Maddy
12/12/2021, 11:54 AMAnna Geller
create_flow_run
task in a parent flow. So you would register your user’s pipeline as a normal independent flow first, but any time the user wants to add a new pipeline (aka the same flow but just with different parameters), instead of modifying the existing flow or appending clocks, you would instead register a new parent flow that triggers this child flow with different parameters and scheduled it to run at a specific time. This would have the effect you want without any concurrency issues and without modifying the clocks.
And as to registering multiple flows, you can use “prefect register” CLI to do that.Kevin Kho