Nil
06/26/2023, 2:21 PMsubmit
some functions as tasks.
class A:
@task
def holiday_from_universe(self) -> UniverseHoliday:
return self._load_source_table("holiday", UniverseHoliday()) #_load_source_table is from a different module
@flow()
def trigger_ingest(name="Ingest Flow"):
a = A(universe)
a.holiday_from_universe.submit()
if __name__ == "__main__":
trigger_ingest()
When I try to run, I get AttributeError: 'function' object has no attribute 'submit'
.I will appreciate some help hereNate
06/26/2023, 3:39 PMself
is one of them
however, calling tasks from methods on a class should work fineNil
06/26/2023, 3:51 PMNate
06/26/2023, 3:54 PMclass A:
def holiday_from_universe(self) -> UniverseHoliday:
load_task = task(name="my load source task")(self._load_source_table)
return load_task.submit("holiday", UniverseHoliday())
@flow()
def trigger_ingest(name="Ingest Flow"):
a = A(universe)
a.holiday_from_universe()
if __name__ == "__main__":
trigger_ingest()
Nil
06/27/2023, 12:00 PMNate
06/27/2023, 2:29 PMsubmit
then we'll use the ConcurrentTaskRunner
, so not truly parallel but will be a performance increase over sequential
basically I'd say use submit
when utilizing a specific task runner is important, since the task runner will not be used unless you submit
a task run to it
for example, if you chose the DaskTaskRunner
for true parallelism, you'd definitely want to use submit
to leverage that, but there's some cases where its just overcomplicating for simpler flowsNil
06/28/2023, 3:06 PMself
and other parameters somehow to make the tests run. Or we have to completely change how testing is done at that moment. Not sure if you have a better ideaNate
06/28/2023, 3:10 PMDue to current test setup in the project, we have to pass dynamic inputs to the tasks/class methods
have to passthis is what I was suggesting you should avoid if possible. When would you have to passself
self
if @task
is not decorating a method on a class?Nil
06/28/2023, 3:15 PM@pytest.mark.local_integration
def test_func1(db):
executor.db_module_test(
step=PIPELINE_STEP,
testcase_name="func1",
inputs=[Input()],
expected_output=Output(),
module_func=ClassA(PostgresUniverse(db)).func1,
db=db,
)
Nate
06/28/2023, 5:04 PMNil
06/29/2023, 1:47 PM