Hi, I have a list of flows that are very similar a...
# ask-community
a
Hi, I have a list of flows that are very similar and I wanted to create a factory function in order to make things easier to maintain. The code looks very similar to this:
from prefect import Flow, Parameter, task
from etl import prepare, load
from crawlers import crawler1, crawler2
from typing import Dict, Callable, String
prepare = task(prepare)
load = task(load)
def make_standard_pipeline(flow_name: String, func :Callable, func_params:Dict):
    
with Flow(flow_name) as flow:
        
params = {k: Parameter(k, default = v) for k,v in func_params.items()}
        
df = func(**params)
        
df = prepare(df)
        
df = load(df)
    
return flow
pipe1 = make_standard_pipeline('flow1', task(crawler1), {})
pipe2 = make_standard_pipeline('flow2', task(crawler2), {'type' : 'xxx'})
Locally this code runs fine and I can also register these on the prefect server. However, when I try to run the flows, only the first flow defined in the file runs successfully (I tested reordering the flows). For the other flows, I get Key Error saying it couldn't found task slug crawler2. Does anyone has hints on what could be causing this problem? Thank you.
a
@André Bonatto the prefect.task is supposed to be used as a decorator, e.g.:
Copy code
@task
def prepare():
to reduce code duplication, you could either subclass the Task class, or even easier, just import and call the reusable classes and functions inside of tasks, e.g.
Copy code
@task
def prepare():
    your_crawler_function() # or:
    crawler = YourCrawler()
    crawler.run()
a
Thanks @Anna Geller. I refactored my code to subclass the Task class directly and it worked.