Hello, it seems like that documentation for the Da...
# best-practices
e
Hello, it seems like that documentation for the DatabricksSubmitRun is obsolete. It suggests to pass the connection string as a string-encoded json using PrefectSecret, but the class only accepts a dict
k
Ah the DatabricksRunNow is newer. Maybe you can try that instead? DatabricksSubmitRun is kept around for backward compatibility
e
the annoying thing is that now I need to use databricks CLI to create a job and prefect to run it?
This is not very useful, I would have expected a Task to create a Job on Databricks too 😞
k
How would you create the job? You’d have a notebook ready and then just define the notebook and compute to run it?
e
I found it, it has an horrible name
Copy code
DatabricksSubmitMultitaskRun
is v2.1 api
k
Feedback is welcome because we’re working on the Prefect 2.0 version. What would you rename it to?
e
good question. I think you probably want to have a factory method:
Copy code
def submit_run(apiVersion:str) -> Task
Can you help me with this ? https://docs.prefect.io/api/latest/tasks/databricks.html#databrickssubmitmultitaskrun I don't understand how the example fits with what we discussed ysterday, sharing a part of the configuration between two tasks
k
It’s just the same. You can use that large init, and then during runtime, override stuff or add more keyword arguments
e
but I still have the problem, should I define the function where I create the instance as a task?
k
Ah yes your function can return an initialized task
e
that's what I was doing before, ugh
but instead all the "non params" should be added within the with Flow(...) is that right?
k
No. You can define them outside
Copy code
mytask = MyTask(a=1,b=2)
with Flow(...) as flow:
    mytask(a=3,b=4,c=3)
The run here will override a and b
or
Copy code
with Flow(...) as flow:
    mytask(c=3) # inherits a and b
e
let me try agian 😄
there is something missing in that API
k
What is missing?
e
the shared cluster part
no BS, y fault
my fault
@Kevin Kho this is not working for me 😞
k
What is your error?
e
Copy code
job1 = my_factory_method1(spark_version, github_repo, git_ref)
    job2 = my_factory_method2(spark_version, github_repo, git_ref)
    job1(databricks_conn_secret=databricks_connstring)
    job2(databricks_conn_secret=databricks_connstring)
so my_factory_method_1 / my_factory_method_2 both return a task
Copy code
File "runner.py", line 122, in <module>
    job1(databricks_conn_secret=databricks_connstring)
  File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 662, in __call__
    new.bind(
  File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 703, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 3039, in bind
    return self._bind(args, kwargs)
  File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 2954, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'spark_version'
k
can i see the factory?
e
sure
Copy code
def my_mfactory_method(param1,param2,param3,param4)
 return new DatabricksSubmitMultiTask
@task
def my_factory_method_1(param3,param4):
return my_factory_method('fixed', 'fixed2', param3, param4)
k
Are param defined during runtime or registration time?
You can’t have a task that returns this other task
Copy code
@task
def abc():
    return MyTask()
will not work because the MyTask needs to be instantiated during the registration time but the task is executed during runtime. you can do:
Copy code
@task
def abc(param1):
    return MyTask(__init__stuff_here).run(run_stuff_here, param1)
and then:
Copy code
with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
but then
MyTask
is not a task anymore. It just runs the Python code under the hood.
abc()
will be the task in your flow
e
Copy code
def suite_job(spark_version, github_repo, git_ref, name, notebooks):
    return DatabricksSubmitMultitaskRun(
        run_name=f"${github_repo} - Integration tests - ${git_ref}",
        tasks=tasks

    )

@task
def suite_1_job(spark_version, github_repo, git_ref):
    notebooks = ...
    return suite_job(spark_version, github_repo, git_ref, "Test Suite 1 - Spark UDF", notebooks=notebooks)


@task
def suite_2_job(spark_version,github_repo, git_ref):
    notebooks = ... some code
    return suite_job(
        spark_version=spark_version,
        github_repo=github_repo,
        git_ref=git_ref,
        name = "Test Suite 2 - MLFlow Integration",
        notebooks=notebooks
    )



with Flow("hello-flow") as flow:
    # An optional parameter "people", with a default list of names
    spark_version = Parameter("spark_version")
    github_repo = Parameter("github_repo")
    git_ref = Parameter("git_ref")
    databricks_connstring = {
        'host': os.environ['DATABRICKS_HOST'],
        'token': os.environ['DATABRICKS_TOKEN']
    }

    job1 = suite_1_job(spark_version, github_repo, git_ref)
    job2 = suite_2_job(spark_version, github_repo, git_ref)
    job1(databricks_conn_secret=databricks_connstring)
    job2(databricks_conn_secret=databricks_connstring)
k
This still won’t work right? You just have a task that calls a function that returns another Task but the task is still returning a Task
The main problem is you’re using a Task to initialize another task
e
yes, but I need to do it to have params right?
k
You can’t initialize with Prefect params because they exist at run time but the init of a task needs to happen at build time. You can either do:
Copy code
def func():
    return MyTask(__init__stuff_here)

@task
def abc(param1):
    return func().run(run_stuff_here, param1)

with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
and calling
run
will run the underlying task or:
Copy code
with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    MyTask()(param1)
where you don’t initialize anything and just pass everything to the
run
since the params are available there. Then
MyTask()
can be your wrapper around the Databricks task to add some logic and call functions
e
I am confused MyTask()
k
Could you run try the first way then by explicitly calling
.run()
, just replace
MyTask
with the
DatabricksSubmitMultitaskRun
MyTask
can be any task based on the Prefect Task
e
sorry, I am confused. What is MyTask() ? my function is called abc, and the other is called func()
k
MyTask is the
DatabricksSubmitMultitaskRun
but the setup will work with all tasks
e
How is abc used? It seems it is not usd
and func? I find this very confusing. I am an experienced programmer and I can't write dry code with Prefect
Can you maybe point me to documentation which can clarify this? Thanks a lot for your help
k
There is no doc that clarified this. It’s used in the flow. Ignore the second snippet just look at this:
Copy code
def func():
    return MyTask(__init__stuff_here)

@task
def abc(param1):
    return func().run(run_stuff_here, param1)

with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
abc
is used in the flow and
func
is used by
abc
e
I know got it working. The important part is that I use the .run within the @task annotated
k
Yes! Nice work!