Alexander Azzam
10/23/2023, 2:03 AMfrom pydantic.v1 import
how’s that fare?Samuel Hinton
10/23/2023, 2:04 AMAlexander Azzam
10/23/2023, 2:10 AMAlexander Azzam
10/23/2023, 2:13 AMSamuel Hinton
10/23/2023, 2:15 AMprefect deploy
etc, we have a repo of flows. We decorate each flow so it gets saved into a registry, and then in our CI pipeline we can get all flows in the registry and use Deployment.build_from_flow
with the additional scheduling data passed in
So effectively, the objects we care most about working are those which are passed into build_from_flow
. Most of these are primitives, or (like the S3 block storage) common to all flows and thus not part of each individual flow configurationAlexander Azzam
10/23/2023, 2:17 AMSamuel Hinton
10/23/2023, 2:19 AMdeploy_flows.py
script utilises that infoAlexander Azzam
10/23/2023, 2:19 AMSamuel Hinton
10/23/2023, 2:21 AMwith temporary_settings({PREFECT_API_URL: _manager_.endpoint}):
Feel free to send a DM or an email to sh@arenko.group
if its more useful to have a video chat and I can run you through our code more easily (if that would be useful)Samuel Hinton
10/23/2023, 5:37 AMimport asyncio
from typing import Any
from prefect import Flow, flow, task
from prefect.deployments.deployments import Deployment
from prefect.filesystems import S3
from prefect.server.schemas.schedules import RRuleSchedule
from prefect.settings import PREFECT_API_URL, temporary_settings
from pydantic import BaseModel, Field
class Environment(BaseModel):
name: str
endpoint: str
production: bool
s3_block: str = "flow-storage"
ecs_block: str = "ecs-block"
class ConfigDeploy(BaseModel):
# We have our own class to customise behaviour
# I considered subclassing the Deployment basemodel
# But was cautious about it maybe changing and breaking things
schedule: RRuleSchedule
name: str = "default"
parameters: dict[str, Any] = Field(default={})
flow: Flow = Field(default=None)
enable_in_dev: bool = Field(default=False, exclude=True)
class Config:
arbitrary_types_allowed = True
class Registry(BaseModel):
registry: list[ConfigDeploy] = Field(default=[])
def __call__(self, deployment: ConfigDeploy):
def wrapper(fn):
deployment.flow = fn
self.registry.append(deployment)
return fn
return wrapper
registry = Registry()
@task
def sum(a, b):
return a + b
@registry(ConfigDeploy(schedule=RRuleSchedule(rrule="FREQ=WEEKLY;BYDAY=MO;UNTIL=20240730T040000Z")))
@registry(
ConfigDeploy(
name="override1",
schedule=RRuleSchedule(rrule="FREQ=WEEKLY;BYDAY=MO;UNTIL=20240730T040000Z"),
parameters={"a": 2, "b": 3},
)
)
@flow
def hello(a: int = 1, b: int = 2):
return sum(a, b)
async def main():
envs = [
Environment(name="prod", endpoint="<http://orion.orchestrator-prod.arenko:4200/api>", production=True),
Environment(name="dev1", endpoint="<http://orion.orchestrator-dev1.arenko:4200/api>", production=False),
Environment(name="dev2", endpoint="<http://orion.orchestrator-dev2.arenko:4200/api>", production=False),
]
for env in envs:
with temporary_settings({PREFECT_API_URL: env.endpoint}):
s3_block = await S3.load(env.s3_block)
upload = True
for deployment in registry.registry:
active_schedule = env.production or deployment.enable_in_dev
await Deployment.build_from_flow(
is_schedule_active=active_schedule,
storage=s3_block,
apply=True,
path="/",
load_existing=False,
skip_upload=not upload,
**deployment.dict(),
)
# Forgive me for using print over logging
print(f"Deployed {deployment.flow.name}:{deployment.name} onto {env.name}")
upload = False # No need to upload this repo over and over
if __name__ == "__main__":
asyncio.run(main())
alex
10/23/2023, 2:43 PM