Sam Thomas
10/07/2022, 3:45 PMclass Maker():
def __init__(self, val=5):
self._val=val
def make_number(self):
return self._val
@property
def make_number_task(self):
return prefect.Task(self.make_number)
class Adder():
def __init__(self, val=3):
self._val=val
def add_number(self, x):
return x+self._val
@property
def add_number_task(self):
return prefect.Task(self.add_number)
class Calculator():
def __init__(self):
self.maker = Maker()
self.adder = Adder()
def _flow(self):
a = self.maker.make_number_task()
b = self.adder.add_number_task(a)
return b
@property
def flow(self):
return prefect.Flow(self._flow)
This seems to work.
c = Calculator()
c.flow()
<Prefect messages>
8
c.maker._val=10
c.flow()
<Prefect messages>
13
I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.Peyton Runyan
10/07/2022, 5:23 PMSam Thomas
10/11/2022, 2:49 PMclass Template():
def process():
data = self.extractor.extract_data()
new_data = self.translator.translate(data)
self.loader.load(new_data)
If a new data source arrives all I need to do is create a new Extractor class for that source and I can build my workflows easily.
I'd like Template.process() to be a Prefect flow and Extractor.extract_data() to be a task so that I can orchestrate and deploy it through Prefect.Peyton Runyan
10/12/2022, 1:16 PMrun_deployment
https://medium.com/the-prefect-blog/prefect-2-5-makes-modular-and-serverless-dataflows-easier-than-ever-44d65b625627
A way to make this work for your usecase is create deployments for each class. Then you could create a parent flow that takes parameters for extract, transform, and load. You then pass in the desired class for each component, and it will run those.
class PostgresExtractor(Basemodel)
deployment = "my-postgres-deployment"
...
@flow
def runner(extract: Extractor, transform: Transformer, load: Loader):
# extract
run_deployment(name=extract.deployment, parameters=dict(url=extract.url, count=extract.count))
# transform
run_deployment(name=transform.deployment, parameters=dict(method=transform.method))
...