Emma Rizzi

    Emma Rizzi

    6 months ago
    Hi! I'm trying to implement a python library to gather all the tasks in common in my ETL flows and looking for some insights on the best way to do it with prefect. Sharing more in thread :
    I have a generic DataManager (need for implementing subclasses) looking like:
    class DataManager():
        @task
        def download_data(self, source: str, destination: str):
            pass
    And I tried to use that in a flow like:
    @task
    def get_manager():
        return DataManager()
    
    with Flow("Flow") as flow:
        manager = get_manager()
        manager.download_data(...)
    This fails during build with
    AttributeError: 'FunctionTask' object has no attribute 'download_data'
    because of the object
    manager
    I understand this is because of my definition of the Manager object, and I could remove the
    @task
    from my library's functions, and use them as usual python function inside my flow But I'm not sure if there's already a standardized way to implement tasks in a python package ? I haven't found anything in the docs yet
    Kevin Kho

    Kevin Kho

    6 months ago
    This is not because of the definition. This is because the
    with Flow...
    builds the DAG but it’s not executed until
    flow.run()
    While it’s being build, the
    get_manager
    task hasn’t actually run so
    manager
    is of type
    FunctionTask
    (or Task) and doesn’t have the
    download_data
    attribute. You need to access
    .download_data
    inside a task to defer the execution of that to runtime also
    Emma Rizzi

    Emma Rizzi

    6 months ago
    Thanks @Kevin Kho does this mean download_data should not be a task as we could call it inside another task?
    Anna Geller

    Anna Geller

    6 months ago
    You can call a task within another task by calling its .run() method but this is generally not encouraged in most use cases. Can you define the problem that you try to solve? Do you try to reuse some functionality (your DataManager) across various flows?
    Emma Rizzi

    Emma Rizzi

    6 months ago
    @Anna Geller yes I try to gather some functionnalities with data managers (main objective is to have different implementations depending on the backend used, and abstract them for developpers) Is there a recommended way to implement Prefect Tasks in such cases, or should I stick to classic functions and call them inside Tasks ?
    Anna Geller

    Anna Geller

    6 months ago
    You are spot-on in that using functional API is the right way to go to build reusable tasks. This way, you could build those as building blocks that you can import and call within various flows. Also, using the functional API will make it much easier to migrate later to Prefect 2 (Orion) as the same syntax will to a large extent work the same way
    Emma Rizzi

    Emma Rizzi

    6 months ago
    Thanks! I'll keep working on that direction, we hope to try out orion soon as well 🙂
    Kevin Kho

    Kevin Kho

    6 months ago
    More like you can do
    with Flow(...):
        SomeClass().sometask()
    and this will work, but if you so
    with Flow(...):
        some = task_that_returns_SomeClass()
        some.sometask()
    this will not because
    some
    is not evaluated until runtime so you have to do:
    @task
    def do_sometask(x):
        return x.sometask()
    
    with Flow(...):
        some = task_that_returns_SomeClass()
        result = do_sometask(some)