Riley Hun
08/08/2020, 12:51 AM# parent class
class ABC:
def __init__(
self,
user,
password
)
self.user = user
self.password = password
def query(self)
pass
I want to do something like this
# task
class Task_A(Task, ABC):
def run()
pass
Currently, I'm just doing this instead
@task
def task_a(user, password, date_col, dataset_id):
user: str = None,
password: str = None,
date_col: str = None,
dataset_id: str = None
):
conn = ABC(user=user, password=password)
query = f"SELECT DISTINCT {date_col} FROM EDW_DNA_DB.WI.{dataset_id}"
query_result = conn.query(query)
return query_result[date_col].tolist()
Jeremiah
08/08/2020, 1:30 AMRiley Hun
08/08/2020, 7:10 AMclass SfTask(Task, ConnectToSnowflake):
def __init__(self,
date_column: str = None,
dataset_id: str = None,
*args,
**kwargs
):
self.date_column = date_column
self.dataset_id = dataset_id
super(SfTask, self).__init__(*args, **kwargs)
@defaults_from_attrs(
"date_column",
"dataset_id"
)
def run(self,
bucket_name,
date_column,
dataset_id
):
query = f"SELECT DISTINCT {date_column} FROM EDW_DNA_DB.WI.THINKNUM_{dataset_id}"
query_result = self.query_to_df(query)
return query_result[date_column].tolist()
But when I try to instantiate it, I get an error
TypeError: __init__() got an unexpected keyword argument 'bucket_name'
where 'bucket_name' is an init variable from the parent classJeremiah
08/08/2020, 3:21 PMsuper
call. For example see https://stackoverflow.com/questions/34884567/python-multiple-inheritance-passing-arguments-to-constructors-using-super/34885285