Giang Hoang Le
12/18/2019, 2:54 AMChris White
12/18/2019, 2:59 AMtask
decorator, and if your functions need to share any state, you can exchange the information like normal python functions would:
@task
def return_some_state():
return [1, 2, 3]
@task
def process_state(x):
return [i + 1 for i in x]
with Flow("example") as flow:
process_state(return_some_state)
Let me know if that helps for your use case!Giang Hoang Le
12/18/2019, 3:04 AMChris White
12/18/2019, 3:06 AMprefect.Task
class, you can override its run()
method and then use it within a Flow; you can call as many other methods as you’d like within the run
method and it should work as you would expect at runtime. For example: https://docs.prefect.io/core/tutorials/task-guide.html#subclassing-the-task-classGiang Hoang Le
12/18/2019, 3:11 AMsource = get_pipeline(source_name)
with Flow('S3 to <Redshift-s3://bank>_data/2019/*.csv') as flow:
result_discorver = source.discover()
result_extract = source.extract()
result_ transform = source.transform()
result_load = source.load()
result_verify = source.verify()
flow.deploy()
def get_pipeline(source_name):
if source_name == 's3'
return S3
which is S3(Source) class
override the functions for differences between sources
class Source:
"""
DataFlow is a base class for building various dataflows based on
different inputs.
"""
def __init__(
self,
report,
parser_name='csv',
target_name='redshift'
profile=None,
schema_name='public',
load_mode='replace',
frequency='daily'
sql_script=None,
**kwargs
):
self.report = report
self.parser_name = parser_name
self.target_name = target_name
self.profile_name = profile_name
self.schema_name = schema_name
self.load_mode = load_mode
self.frequency = frequency
self.time = time
self.sql_script = sql_script
super().__init__(**kwargs)
self.fs = self._get_filesytem()
def discover(self):
raise NotImplementedError("Subclass must implement instance method")
def extract(self):
raise NotImplementedError("Subclass must implement instance method")
def transform(self):
raise NotImplementedError("Subclass must implement instance method")
def load(self, data_source):
return CopyRedshift(data_source)
def verify(self):
raise VerifyRedshift()
Chris White
12/18/2019, 3:34 AMGiang Hoang Le
12/18/2019, 3:40 AM