https://prefect.io logo
Title
s

Sam Thomas

10/07/2022, 3:45 PM
I've been looking at trying to integrate Prefect (2.x) with Object Oriented Programming and design patterns. As discussed in an earlier thread https://app.slack.com/client/TL09B008Y/CL09KU1K7/thread/CL09KU1K7-1658744762.332369 it's not straight forward to integrate Tasks and Flows with classes. I've got something that works but I was wondering if there's a more elegant way In this example I'm making a process that gets a number and adds another number to it for simplicity
class Maker():
    def __init__(self, val=5):
        self._val=val
        
    def make_number(self):
        return self._val
    
    @property
    def make_number_task(self):
        return prefect.Task(self.make_number)
    
class Adder():
    def __init__(self, val=3):
        self._val=val

    def add_number(self, x):
        return x+self._val
    
    @property
    def add_number_task(self):
        return prefect.Task(self.add_number)
    
class Calculator():
    def __init__(self):
        self.maker = Maker()
        self.adder = Adder()

    def _flow(self):
        a = self.maker.make_number_task()
        b = self.adder.add_number_task(a)
        return b
    
    @property
    def flow(self):
        return prefect.Flow(self._flow)
This seems to work.
c = Calculator()
c.flow()
<Prefect messages>
8
c.maker._val=10
c.flow()
<Prefect messages>
13
I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.
p

Peyton Runyan

10/07/2022, 5:23 PM
What's your usecase?
:upvote: 2
s

Sam Thomas

10/11/2022, 2:49 PM
Good question. I'm looking to design a framework that would make it easy for a development team to build processing flows that follow a standard pattern. Say for example if I have a requirement to process data from source 1 using method A then upload to the database and one which requires me to process data from source 2 with method B then upload to the same database and I fully expect to have future requirements for new combinations of sources and methods. I have my template which contains an Extractor, a Translator and a Loader which is something like:
class Template():
    def process():
         data = self.extractor.extract_data()
         new_data = self.translator.translate(data)
         self.loader.load(new_data)
If a new data source arrives all I need to do is create a new Extractor class for that source and I can build my workflows easily. I'd like Template.process() to be a Prefect flow and Extractor.extract_data() to be a task so that I can orchestrate and deploy it through Prefect.
p

Peyton Runyan

10/12/2022, 1:16 PM
I dig it! We actually just released a utility to make functionality like this a bit simpler. See the section on
run_deployment
https://medium.com/the-prefect-blog/prefect-2-5-makes-modular-and-serverless-dataflows-easier-than-ever-44d65b625627 A way to make this work for your usecase is create deployments for each class. Then you could create a parent flow that takes parameters for extract, transform, and load. You then pass in the desired class for each component, and it will run those.
class PostgresExtractor(Basemodel)
    deployment = "my-postgres-deployment"
    ...

@flow
def runner(extract: Extractor, transform: Transformer, load: Loader):
    # extract
    run_deployment(name=extract.deployment, parameters=dict(url=extract.url, count=extract.count))
    # transform
    run_deployment(name=transform.deployment, parameters=dict(method=transform.method))
    ...
🔥 1