Ben Muller
09/13/2021, 8:10 PMBen Muller
09/13/2021, 10:08 PMfrom prefect import Flow
from python.helpers.clock import Clock
from python.helpers.run_config import RunConfig
from python.helpers.storage import Storage
from prefect.executors import LocalDaskExecutor
class BaseFlowBuilder:
def __init__(self, bookmaker, bookmaker_name, cron, cpu, memory, dask=False, scheduler=None) -> None:
self.bookmaker = bookmaker
self.bookmaker_name = bookmaker_name
self.cron = cron
self.cpu = cpu
self.memory = memory
self.dask = dask
self.scheduler = scheduler
def build_flow(self):
with Flow(
name=f"{self.bookmaker_name}_flow",
storage=Storage().in_a_s3_bucket(),
run_config=RunConfig().fargate_on_ecs(cpu=self.cpu, memory=self.memory),
schedule=Clock.set_cron(self.cron),
) as flow:
self.execute_tasks()
if self.dask:
flow.executor = LocalDaskExecutor(scheduler=self.scheduler)
return flow
def execute_tasks(self):
raise NotImplementedError("You must implement your flow task logic in the concrete class")
And then an example implementation
from python.flow_builders.base_flow_builder import BaseFlowBuilder
class SpecificFlowBuilder(BaseFlowBuilder):
def __init__(self, bookmaker, bookmaker_name, cron, cpu, memory, dask=False, scheduler=None) -> None:
super().__init__(
bookmaker = bookmaker,
bookmaker_name = bookmaker_name,
cron = cron,
cpu = cpu,
memory = memory,
dask = dask,
scheduler = scheduler,
)
def execute_tasks(self):
# Add all my tasks here
I can then instantiate it and save a tonne of code like so !
flow = SpecificFlowBuilder(
bookmaker=Bookmaker,
bookmaker_name="name",
cron="3,18,33,48 7-23 * * *",
cpu=512,
memory=2048,
dask=True,
scheduler="threads"
).build_flow()
Kevin Kho
Marvin
09/13/2021, 10:21 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by