Emma Rizzi
03/21/2022, 3:52 PMclass DataManager():
@task
def download_data(self, source: str, destination: str):
pass
And I tried to use that in a flow like:
@task
def get_manager():
return DataManager()
with Flow("Flow") as flow:
manager = get_manager()
manager.download_data(...)
This fails during build with AttributeError: 'FunctionTask' object has no attribute 'download_data'
because of the object manager
I understand this is because of my definition of the Manager object, and I could remove the @task
from my library's functions, and use them as usual python function inside my flow
But I'm not sure if there's already a standardized way to implement tasks in a python package ? I haven't found anything in the docs yetKevin Kho
03/21/2022, 3:57 PMwith Flow...
builds the DAG but it’s not executed until flow.run()
While it’s being build, the get_manager
task hasn’t actually run so manager
is of type FunctionTask
(or Task) and doesn’t have the download_data
attribute.
You need to access .download_data
inside a task to defer the execution of that to runtime alsoEmma Rizzi
03/22/2022, 7:52 AMAnna Geller
03/22/2022, 10:45 AMEmma Rizzi
03/22/2022, 12:35 PMAnna Geller
03/22/2022, 12:49 PMEmma Rizzi
03/22/2022, 1:24 PMKevin Kho
03/22/2022, 2:29 PMwith Flow(...):
SomeClass().sometask()
and this will work, but if you so
with Flow(...):
some = task_that_returns_SomeClass()
some.sometask()
this will not because some
is not evaluated until runtime so you have to do:
@task
def do_sometask(x):
return x.sometask()
with Flow(...):
some = task_that_returns_SomeClass()
result = do_sometask(some)