https://prefect.io logo
n

Nil

06/26/2023, 2:21 PM
Started playing around Prefect today and ran into a problem. I am not able to
submit
some functions as tasks.
Copy code
class A:
    @task
    def holiday_from_universe(self) -> UniverseHoliday:
        return self._load_source_table("holiday", UniverseHoliday()) #_load_source_table is from a different module

@flow()
def trigger_ingest(name="Ingest Flow"):
    a = A(universe)
    a.holiday_from_universe.submit()

if __name__ == "__main__":
    trigger_ingest()
When I try to run, I get
AttributeError: 'function' object has no attribute 'submit'
.I will appreciate some help here
n

Nate

06/26/2023, 3:39 PM
hi @Nil generally speaking I wouldn't make methods on classes prefect tasks - prefect can do a lot of things with the inputs to your tasks (for example caching outputs based on inputs), which gets unintuitive pretty quick when
self
is one of them however, calling tasks from methods on a class should work fine
🙌 1
is there a specific reason you chose to do it the first way?
or even having a class return the task object (that you would then call) should also work fine
n

Nil

06/26/2023, 3:51 PM
Its just that, we have code written in that manner and we are trying to fit into prefect without much refactoring
n

Nate

06/26/2023, 3:54 PM
gotcha, so maybe something like
Copy code
class A:
    def holiday_from_universe(self) -> UniverseHoliday:
        load_task = task(name="my load source task")(self._load_source_table)
        return load_task.submit("holiday", UniverseHoliday())

@flow()
def trigger_ingest(name="Ingest Flow"):
    a = A(universe)
    a.holiday_from_universe()

if __name__ == "__main__":
    trigger_ingest()
n

Nil

06/27/2023, 12:00 PM
this works for this task. Thanks! When do we use submit and when do I not? For the DaskRunner, I see we should use submit. However for the normal runner, submit also works and the tasks run in parallel
n

Nate

06/27/2023, 2:29 PM
so by default, if you use
submit
then we'll use the
ConcurrentTaskRunner
, so not truly parallel but will be a performance increase over sequential basically I'd say use
submit
when utilizing a specific task runner is important, since the task runner will not be used unless you
submit
a task run to it for example, if you chose the
DaskTaskRunner
for true parallelism, you'd definitely want to use
submit
to leverage that, but there's some cases where its just overcomplicating for simpler flows
✅ 1
n

Nil

06/28/2023, 3:06 PM
You were right, Prefect and OOP is not sticking together. Due to current test setup in the project, we have to pass dynamic inputs to the tasks/class methods. Now I have to pass
self
and other parameters somehow to make the tests run. Or we have to completely change how testing is done at that moment. Not sure if you have a better idea
n

Nate

06/28/2023, 3:10 PM
can you show what you mean by
Due to current test setup in the project, we have to pass dynamic inputs to the tasks/class methods
have to pass
self
this is what I was suggesting you should avoid if possible. When would you have to pass
self
if
@task
is not decorating a method on a class?
n

Nil

06/28/2023, 3:15 PM
This is how we test a class method, the inputs and outputs are declared like the following
Copy code
@pytest.mark.local_integration
def test_func1(db):
    executor.db_module_test(
        step=PIPELINE_STEP,
        testcase_name="func1",
        inputs=[Input()],
        expected_output=Output(),
        module_func=ClassA(PostgresUniverse(db)).func1,
        db=db,
    )
n

Nate

06/28/2023, 5:04 PM
hmm sorry I'm not sure I understand what you're doing / what the problem is from that
n

Nil

06/29/2023, 1:47 PM
This was solved by putting a @flow decorator