Hey <@U02MRTWFSTE>! Quick question: if you swap th...
# ask-community
a
Hey @Samuel Hinton! Quick question: if you swap that import to
from pydantic.v1 import
how’s that fare?
s
Im not sure I follow the import you mean. But if I use pydantic v2 but keep all my models as v1, I can confirm it runs:
a
Sorry meant import not install (fixed above). Okay so what’s happening is that prefect objects themselves are v1 classes, and v2 models don’t accept v1 classes as field types. So this is expected but poorly communicated on our end. If you pass pydantic v2 objects to prefect flows you’re good, but if you subclass a prefect object or use it as a field type you can’t currently commingle it with v2 objects. We’re working on the above but it’ll be incremental. If you want to leave an issue to vote how inconvenient this is we can put more coal in the fire. If there are specific prefect objects you’re subclassing or attaching as field types (like in your screen shot also feel free to help us prioritize what would help you the most). Tagging my colleagues @Uriel Mandujano and @Chris Guidry who are thinking about this with me so they have 👀 too.
But enabling that commingling like what you have in your screen shot is definitely top of mind and since it’s incremental we’re in a position to triage specific prefect objects based on where we’re seeing the biggest pain.
s
Hmm, in that case I can always stop adding the schedules to my own deployment objects and construct them when needed. What we're doing (if its useful) is rather than using any of the cli commands
prefect deploy
etc, we have a repo of flows. We decorate each flow so it gets saved into a registry, and then in our CI pipeline we can get all flows in the registry and use
Deployment.build_from_flow
with the additional scheduling data passed in So effectively, the objects we care most about working are those which are passed into
build_from_flow
. Most of these are primitives, or (like the S3 block storage) common to all flows and thus not part of each individual flow configuration
a
Oh man that’s 🤯 . First tagging @alex since this resembles a lot of what he thinks about and works on. But making sure we’re watertight on the commingling for common deployment patterns is super helpful feedback for us.
s
Yeah we found the initial way of deploying flows for prefect2 really cumbersome, and having the deployment schedules/overrides/details in the same location as the flows all in a dataclass made it really intuitive for our developers to add/remove/change deployments without having to know anything else about how the
deploy_flows.py
script utilises that info
a
Would love to follow up with more questions as I think about this if you’re up for a follow up chat at some point (I’m OOO at the moment but can shoot you a DM when I’m back to set up more time to chat?
s
Yeah sure thing. Oh, that other really neat thing is this allows us to deploy all of our dev environments from one place by using
with temporary_settings({PREFECT_API_URL: _manager_.endpoint}):
Feel free to send a DM or an email to
sh@arenko.group
if its more useful to have a video chat and I can run you through our code more easily (if that would be useful)
@alex for your convenience, I've made a stripped down version of what we use to deploy everything in our CI:
Copy code
import asyncio
from typing import Any

from prefect import Flow, flow, task
from prefect.deployments.deployments import Deployment
from prefect.filesystems import S3
from prefect.server.schemas.schedules import RRuleSchedule
from prefect.settings import PREFECT_API_URL, temporary_settings
from pydantic import BaseModel, Field


class Environment(BaseModel):
    name: str
    endpoint: str
    production: bool
    s3_block: str = "flow-storage"
    ecs_block: str = "ecs-block"


class ConfigDeploy(BaseModel):
    # We have our own class to customise behaviour
    # I considered subclassing the Deployment basemodel
    # But was cautious about it maybe changing and breaking things
    schedule: RRuleSchedule
    name: str = "default"
    parameters: dict[str, Any] = Field(default={})
    flow: Flow = Field(default=None)
    enable_in_dev: bool = Field(default=False, exclude=True)

    class Config:
        arbitrary_types_allowed = True


class Registry(BaseModel):
    registry: list[ConfigDeploy] = Field(default=[])

    def __call__(self, deployment: ConfigDeploy):
        def wrapper(fn):
            deployment.flow = fn
            self.registry.append(deployment)
            return fn

        return wrapper


registry = Registry()


@task
def sum(a, b):
    return a + b


@registry(ConfigDeploy(schedule=RRuleSchedule(rrule="FREQ=WEEKLY;BYDAY=MO;UNTIL=20240730T040000Z")))
@registry(
    ConfigDeploy(
        name="override1",
        schedule=RRuleSchedule(rrule="FREQ=WEEKLY;BYDAY=MO;UNTIL=20240730T040000Z"),
        parameters={"a": 2, "b": 3},
    )
)
@flow
def hello(a: int = 1, b: int = 2):
    return sum(a, b)


async def main():
    envs = [
        Environment(name="prod", endpoint="<http://orion.orchestrator-prod.arenko:4200/api>", production=True),
        Environment(name="dev1", endpoint="<http://orion.orchestrator-dev1.arenko:4200/api>", production=False),
        Environment(name="dev2", endpoint="<http://orion.orchestrator-dev2.arenko:4200/api>", production=False),
    ]

    for env in envs:
        with temporary_settings({PREFECT_API_URL: env.endpoint}):
            s3_block = await S3.load(env.s3_block)
            upload = True
            for deployment in registry.registry:
                active_schedule = env.production or deployment.enable_in_dev
                await Deployment.build_from_flow(
                    is_schedule_active=active_schedule,
                    storage=s3_block,
                    apply=True,
                    path="/",
                    load_existing=False,
                    skip_upload=not upload,
                    **deployment.dict(),
                )
                # Forgive me for using print over logging
                print(f"Deployed {deployment.flow.name}:{deployment.name} onto {env.name}")
                upload = False  # No need to upload this repo over and over


if __name__ == "__main__":
    asyncio.run(main())
a
Thanks for sharing @Samuel Hinton! What you built is very cool! We’ve heard other users ask for a UX where they can mark flows for deployment or drop them in a folder and have Prefect auto-discover and apply those deployments, which is precisely what you built! If/when you meet with @Alexander Azzam, I’d like to tag along to see what I can learn.