Hi Team, do you guys have any examples of an implementation of a "flow builder" Class that someone h...
b
Hi Team, do you guys have any examples of an implementation of a "flow builder" Class that someone has built? I have about 10 flows that are identical besides a few minor details. I do not want to use different parameters because I feel like it lacks the proper visibility in the UI and each flow should have its own name. I'm just unsure of how to implement something like this so that registration, running etc of the flow will still work. Can I simply have a class that is invoked with some arguments and then a method which when called runs the flow context manager with the flow inside of it?
Fro anyone that is interested and @Kevin Kho, I ended up going with something like this. A bit of inheritance that suits my need. A base class eg.
Copy code
from prefect import Flow

from python.helpers.clock import Clock
from python.helpers.run_config import RunConfig
from python.helpers.storage import Storage
from prefect.executors import LocalDaskExecutor

class BaseFlowBuilder:
    
    def __init__(self, bookmaker, bookmaker_name, cron, cpu, memory, dask=False, scheduler=None) -> None:
        self.bookmaker = bookmaker
        self.bookmaker_name = bookmaker_name
        self.cron = cron
        self.cpu = cpu
        self.memory = memory
        self.dask = dask
        self.scheduler = scheduler
        
    
    def build_flow(self):
        with Flow(
            name=f"{self.bookmaker_name}_flow",
            storage=Storage().in_a_s3_bucket(),
            run_config=RunConfig().fargate_on_ecs(cpu=self.cpu, memory=self.memory),
            schedule=Clock.set_cron(self.cron),
        ) as flow:
            
            self.execute_tasks()
             
            if self.dask:
                flow.executor = LocalDaskExecutor(scheduler=self.scheduler)
            
            return flow

    
    def execute_tasks(self):
        raise NotImplementedError("You must implement your flow task logic in the concrete class")
And then an example implementation
Copy code
from python.flow_builders.base_flow_builder import BaseFlowBuilder

class SpecificFlowBuilder(BaseFlowBuilder):
    
    def __init__(self, bookmaker, bookmaker_name, cron, cpu, memory, dask=False, scheduler=None) -> None:
         super().__init__(
        bookmaker = bookmaker,
        bookmaker_name = bookmaker_name,
        cron = cron,
        cpu = cpu,
        memory = memory,
        dask = dask,
        scheduler = scheduler,
        )
        
    def execute_tasks(self):
        # Add all my tasks here
I can then instantiate it and save a tonne of code like so !
Copy code
flow = SpecificFlowBuilder(
    bookmaker=Bookmaker,
    bookmaker_name="name",
    cron="3,18,33,48 7-23 * * *",
    cpu=512,
    memory=2048,
    dask=True,
    scheduler="threads"
).build_flow()
k
@Marvin archive “Flow Builder Class”