Edmondo Porcu
05/04/2022, 3:03 AMdef my_task(param1, param2):
return NewTaskSomething(param1, param2)
and then in the flow...
my_task_instance = my_task(param1,param2)
my_task_instance(param3)
however this fails saying that param1 and param2 are not specified. Maybe in reality my_task function should not be decorated with the @mytask
decorator?Kevin Kho
my_task
a task or function?Edmondo Porcu
05/04/2022, 3:07 AMKevin Kho
NewTaskSomething
and it’s a task class, I can use it like this:
new_task = NewTaskSomething(param1, param2)
with Flow(..) as flow:
new_task(param3)
The first call is the init
and the second call is the run
method. The init is created during registration time, and the run is executed when the Flow runs.
Even if I change this to:
with Flow(..) as flow:
new_task = NewTaskSomething(param1, param2)
new_task(param3)
the first one is created during registration time. The second one is executed during flow run time. param1
and param2
need to be defined during registration.
If you want to introduce dynamicism during runtime, let your task take param1
and param2
during runtime.
new_task = NewTaskSomething()
with Flow(..) as flow:
param1 = ... # some other task
param2 = ... # some other task
new_task(param1, param2, param3)
Edmondo Porcu
05/04/2022, 3:13 AMdef my_task_builder(param1, param2, param3):
## build complex config with mostly default
config = .. build complex config from param1,param2,param3
return new DatabricksJob(json = config)
def my_task_1(param1):
return my_task_builder(param1,'a','b')
def my_task_2(param1):
return my_task_builder(param1,'c','d')
with Flow(..) as flow:
param1 = Parameter("param1")
db_conn = secret
my_task_1_instance = my_task_1(param1)
my_task_2_instance = my_task_1(param1)
my_task_1_instance(db_conn)
my_task_2_instance(db_conn)
Kevin Kho
param1
is defined at runtime, but my_task_1
is defined during the registration time. What you need to do is modify the run
method to accept param1
instead of using the init. This will let you parameterize it during run timeEdmondo Porcu
05/04/2022, 3:25 AMKevin Kho
def create_config(param1, param2, param3):
## build complex config with mostly default
config = .. build complex config from param1,param2,param3
return config
my_task = DatabricksJob()
with Flow(..) as flow:
param1 = Parameter("param1")
db_conn = secret
create_config_1 = create_config(param1)
create_config_2 = create_config(param2)
my_task_1_instance = my_task(create_config_1)
my_task_2_instance = my_task(create_config_2)
Edmondo Porcu
05/04/2022, 4:04 AMKevin Kho
Edmondo Porcu
05/04/2022, 4:08 AMKevin Kho
@task
def abc(x):
return x + 1
with Flow(...) as flow:
a = abc(1)
abc(a)
those are different tasks because it makes copies inside the flow blockcreate_config
a task and it will resolve during runtime. We defer everything to run timeEdmondo Porcu
05/04/2022, 4:14 AMKevin Kho