Hello everyone! I am trying to write a task that r...
# best-practices
e
Hello everyone! I am trying to write a task that returns another task, like so:
Copy code
def my_task(param1, param2):
     return NewTaskSomething(param1, param2)

and then in the flow...
my_task_instance = my_task(param1,param2)
my_task_instance(param3)
however this fails saying that param1 and param2 are not specified. Maybe in reality my_task function should not be decorated with the
@mytask
decorator?
k
I am a bit confused what you are trying to do. Is
my_task
a task or function?
e
Is a function, I am using some of the provided Task but I want to create their configuration in functions for reusability...
but I was looking to the task decorator doc, I didn't understand from it what is to be decorated with Task and what should not
it looked like everything is a task, even stuff that is not a task 😄
k
Ah I see what you are doing. So first of all, if I have
NewTaskSomething
and it’s a task class, I can use it like this:
Copy code
new_task = NewTaskSomething(param1, param2)
with Flow(..) as flow:
    new_task(param3)
The first call is the
init
and the second call is the
run
method. The init is created during registration time, and the run is executed when the Flow runs. Even if I change this to:
Copy code
with Flow(..) as flow:
    new_task = NewTaskSomething(param1, param2)
    new_task(param3)
the first one is created during registration time. The second one is executed during flow run time.
param1
and
param2
need to be defined during registration. If you want to introduce dynamicism during runtime, let your task take
param1
and
param2
during runtime.
Copy code
new_task = NewTaskSomething()
with Flow(..) as flow:
    param1 = ... # some other task
    param2 = ... # some other task
    new_task(param1, param2, param3)
e
This is useful, let me move one step further
Copy code
def my_task_builder(param1, param2, param3):
  ## build complex config with mostly default
  config = .. build complex config from param1,param2,param3
  return new DatabricksJob(json = config)

def my_task_1(param1):
  return my_task_builder(param1,'a','b')

def my_task_2(param1):
  return my_task_builder(param1,'c','d')

with Flow(..) as flow:
  param1 = Parameter("param1")
  db_conn = secret
  my_task_1_instance = my_task_1(param1)
  my_task_2_instance = my_task_1(param1)
  my_task_1_instance(db_conn)
  my_task_2_instance(db_conn)
Do you understand my problem? The Task has a constructor which takes a complex configuration, and I need to create two very similar tasks. So I want to create an helper function that can calculate the configuration starting from the few parameters
k
Yeah the problem is
param1
is defined at runtime, but
my_task_1
is defined during the registration time. What you need to do is modify the
run
method to accept
param1
instead of using the init. This will let you parameterize it during run time
assuming that the task already allows that, how should I change that code?
k
Something like this
Copy code
def create_config(param1, param2, param3):
  ## build complex config with mostly default
  config = .. build complex config from param1,param2,param3
  return config

my_task = DatabricksJob()

with Flow(..) as flow:
  param1 = Parameter("param1")
  db_conn = secret
  create_config_1 = create_config(param1)
  create_config_2 = create_config(param2)
  my_task_1_instance = my_task(create_config_1)
  my_task_2_instance = my_task(create_config_2)
e
How come you only need an instance of a task ?
k
Because the config is passed into the runtime anyway and then executed, but you can have multiple copies. There is just no configuration done at the constructor so it can be used with different configurations
e
I see. Will the backend track those as different tasks though?
I would like to read maybe more docs to ensure I understand
Also how is the param resolved within my create config ?
k
yes they will be. even if you do:
Copy code
@task
def abc(x):
    return x + 1

with Flow(...) as flow:
    a = abc(1)
    abc(a)
those are different tasks because it makes copies inside the flow block
Oh sorry. Make
create_config
a task and it will resolve during runtime. We defer everything to run time
e
Right, if not the param will be an instance of the object and not is value
k
Yeah exactly