https://prefect.io logo
Title
e

Emma Rizzi

03/21/2022, 3:52 PM
Hi! I'm trying to implement a python library to gather all the tasks in common in my ETL flows and looking for some insights on the best way to do it with prefect. Sharing more in thread :
I have a generic DataManager (need for implementing subclasses) looking like:
class DataManager():
    @task
    def download_data(self, source: str, destination: str):
        pass
And I tried to use that in a flow like:
@task
def get_manager():
    return DataManager()

with Flow("Flow") as flow:
    manager = get_manager()
    manager.download_data(...)
This fails during build with
AttributeError: 'FunctionTask' object has no attribute 'download_data'
because of the object
manager
I understand this is because of my definition of the Manager object, and I could remove the
@task
from my library's functions, and use them as usual python function inside my flow But I'm not sure if there's already a standardized way to implement tasks in a python package ? I haven't found anything in the docs yet
k

Kevin Kho

03/21/2022, 3:57 PM
This is not because of the definition. This is because the
with Flow...
builds the DAG but it’s not executed until
flow.run()
While it’s being build, the
get_manager
task hasn’t actually run so
manager
is of type
FunctionTask
(or Task) and doesn’t have the
download_data
attribute. You need to access
.download_data
inside a task to defer the execution of that to runtime also
e

Emma Rizzi

03/22/2022, 7:52 AM
Thanks @Kevin Kho does this mean download_data should not be a task as we could call it inside another task?
a

Anna Geller

03/22/2022, 10:45 AM
You can call a task within another task by calling its .run() method but this is generally not encouraged in most use cases. Can you define the problem that you try to solve? Do you try to reuse some functionality (your DataManager) across various flows?
e

Emma Rizzi

03/22/2022, 12:35 PM
@Anna Geller yes I try to gather some functionnalities with data managers (main objective is to have different implementations depending on the backend used, and abstract them for developpers) Is there a recommended way to implement Prefect Tasks in such cases, or should I stick to classic functions and call them inside Tasks ?
a

Anna Geller

03/22/2022, 12:49 PM
You are spot-on in that using functional API is the right way to go to build reusable tasks. This way, you could build those as building blocks that you can import and call within various flows. Also, using the functional API will make it much easier to migrate later to Prefect 2 (Orion) as the same syntax will to a large extent work the same way
e

Emma Rizzi

03/22/2022, 1:24 PM
Thanks! I'll keep working on that direction, we hope to try out orion soon as well 🙂
🙌 1
k

Kevin Kho

03/22/2022, 2:29 PM
More like you can do
with Flow(...):
    SomeClass().sometask()
and this will work, but if you so
with Flow(...):
    some = task_that_returns_SomeClass()
    some.sometask()
this will not because
some
is not evaluated until runtime so you have to do:
@task
def do_sometask(x):
    return x.sometask()

with Flow(...):
    some = task_that_returns_SomeClass()
    result = do_sometask(some)
👌 1