Fabio Scarpellini
04/21/2022, 7:41 PMKevin Kho
Nate
04/21/2022, 9:26 PMimport datetime
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(
start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 1"},
)
clock2 = clocks.IntervalClock(
start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"p": "CLOCK 2"},
)
# the full schedule
schedule = Schedule(clocks=[clock1, clock2])
flow.schedule = schedule # set the schedule on the Flow
flow.run()
Anna Geller
docker_storage.add_flow(flow)
Fabio Scarpellini
04/22/2022, 1:28 PMdef registration_func(company):
with Flow(name=company, storage=docker_storage, run_config=kubernetes_run) as flow:
docker_storage.add_flow(flow)
flow.register(project_name="Dev", build=False)
Anna Geller
registration_func
is the problem - your flow object won't be found by Prefect
Can you try following my example from GitHub?Fabio Scarpellini
04/22/2022, 1:58 PMAnna Geller
Fabio Scarpellini
04/22/2022, 5:34 PMAnna Geller
import datetime
from prefect.schedules import clocks, Schedule
now = datetime.datetime.utcnow()
clock1 = clocks.IntervalClock(
start_date=now,
interval=datetime.timedelta(minutes=1),
parameter_defaults={"brand": "brand1"},
)
clock2 = clocks.IntervalClock(
start_date=now + datetime.timedelta(seconds=30),
interval=datetime.timedelta(minutes=1),
parameter_defaults={"brand": "brand2"},
)
# the full schedule
schedule = Schedule(clocks=[clock1, clock2])
flow.schedule = schedule # set the schedule on the Flow
another possibility is to use a parametrized flow of flows as shown in this Discourse topic
it could look like this - your child flow is your main flow. Then your parent flow triggers multiple child flow runs, one for each brand:
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("your_brand_flow"),
project_name=unmapped("your_project_name"),
parameters=["brand1", "brand2", "brand3"]
)
Fabio Scarpellini
04/22/2022, 6:23 PM