André Bonatto
11/20/2021, 12:50 PMfrom prefect import Flow, Parameter, task
from etl import prepare, load
from crawlers import crawler1, crawler2
from typing import Dict, Callable, String
prepare = task(prepare)
load = task(load)
def make_standard_pipeline(flow_name: String, func :Callable, func_params:Dict):
with Flow(flow_name) as flow:
params = {k: Parameter(k, default = v) for k,v in func_params.items()}
df = func(**params)
df = prepare(df)
df = load(df)
return flow
pipe1 = make_standard_pipeline('flow1', task(crawler1), {})
pipe2 = make_standard_pipeline('flow2', task(crawler2), {'type' : 'xxx'})
Locally this code runs fine and I can also register these on the prefect server. However, when I try to run the flows, only the first flow defined in the file runs successfully (I tested reordering the flows). For the other flows, I get Key Error saying it couldn't found task slug crawler2. Does anyone has hints on what could be causing this problem?
Thank you.Anna Geller
@task
def prepare():
to reduce code duplication, you could either subclass the Task class, or even easier, just import and call the reusable classes and functions inside of tasks, e.g.
@task
def prepare():
your_crawler_function() # or:
crawler = YourCrawler()
crawler.run()
André Bonatto
11/23/2021, 2:24 AM