https://prefect.io logo
g

Giang Hoang Le

12/18/2019, 2:54 AM
Hi Team, I would like to ask I have been developing a system based on a Prefect Core. I have found out to do distributed programming, I need to write functions to separate classes and classes themselves are not state-full. So I would like to ask with current docker container technology. Is it possible to develop normal functions that can used in distributed environment without writing separate classes? Regards Giang
c

Chris White

12/18/2019, 2:59 AM
Hi @Giang Hoang Le - perhaps Prefect’s functional API will suit your needs; you can decorate your functions with the prefect
task
decorator, and if your functions need to share any state, you can exchange the information like normal python functions would:
Copy code
@task
def return_some_state():
    return [1, 2, 3]

@task
def process_state(x):
    return [i + 1 for i in x]

with Flow("example") as flow:
    process_state(return_some_state)
Let me know if that helps for your use case!
g

Giang Hoang Le

12/18/2019, 3:04 AM
Hi Chris, I am current developing a system that use OOP (polymorphism) and tasks are in different classes for different sources. So I need to write tasks like classes to used in child classes. Therefore, I would like to ask is it possible to write functions within class, but still possible to use in Flow?
c

Chris White

12/18/2019, 3:06 AM
If you subclass the base
prefect.Task
class, you can override its
run()
method and then use it within a Flow; you can call as many other methods as you’d like within the
run
method and it should work as you would expect at runtime. For example: https://docs.prefect.io/core/tutorials/task-guide.html#subclassing-the-task-class
g

Giang Hoang Le

12/18/2019, 3:11 AM
source = get_pipeline(source_name)
with Flow('S3 to <Redshift-s3://bank>_data/2019/*.csv') as flow:
result_discorver = source.discover()
result_extract = source.extract()
result_ transform = source.transform()
result_load = source.load()
result_verify = source.verify()
flow.deploy()
def get_pipeline(source_name):
               
if source_name == 's3'
                               
return S3
which is S3(Source) class override the functions for differences between sources
class Source:
   
"""
   
DataFlow is a base class for building various dataflows based on
   
different inputs.
   
"""
   
def __init__(
       
self,
       
report,
       
parser_name='csv',
       
target_name='redshift'
       
profile=None,
       
schema_name='public',
       
load_mode='replace',
       
frequency='daily'
       
sql_script=None,
       
**kwargs
   
):
       
self.report = report
       
self.parser_name = parser_name
       
self.target_name = target_name
       
self.profile_name = profile_name
       
self.schema_name = schema_name
       
self.load_mode = load_mode
       
self.frequency = frequency
       
self.time = time
       
self.sql_script = sql_script
       
super().__init__(**kwargs)
       
self.fs = self._get_filesytem()
   
def discover(self):
       
raise NotImplementedError("Subclass must implement instance method")
   
def extract(self):
       
raise NotImplementedError("Subclass must implement instance method")
   
def transform(self):
       
raise NotImplementedError("Subclass must implement instance method")
   
def load(self, data_source):
       
return CopyRedshift(data_source)
   
def verify(self):
       
raise VerifyRedshift()
this is my idea!!!
c

Chris White

12/18/2019, 3:34 AM
Gotcha - I understand what you’re looking for now; I think this is tricky to implement, as prefect can only track the calls of things that are registered as “tasks”, and registering class methods as tasks introduces many complications about how to store the value of “self”. That being said, we might be able to introduce a factory base class that you could use as a Mixin that would allow for something like this — would you mind opening an issue on GitHub describing what your use case above? We’ll look into it!
g

Giang Hoang Le

12/18/2019, 3:40 AM
Hi Chris, Ok!!