https://prefect.io logo
#best-practices
Title
# best-practices
e

Edmondo Porcu

05/04/2022, 5:49 PM
Hello, it seems like that documentation for the DatabricksSubmitRun is obsolete. It suggests to pass the connection string as a string-encoded json using PrefectSecret, but the class only accepts a dict
k

Kevin Kho

05/04/2022, 5:52 PM
Ah the DatabricksRunNow is newer. Maybe you can try that instead? DatabricksSubmitRun is kept around for backward compatibility
e

Edmondo Porcu

05/04/2022, 6:22 PM
the annoying thing is that now I need to use databricks CLI to create a job and prefect to run it?
This is not very useful, I would have expected a Task to create a Job on Databricks too 😞
k

Kevin Kho

05/04/2022, 6:37 PM
How would you create the job? You’d have a notebook ready and then just define the notebook and compute to run it?
e

Edmondo Porcu

05/04/2022, 6:39 PM
I found it, it has an horrible name
Copy code
DatabricksSubmitMultitaskRun
is v2.1 api
k

Kevin Kho

05/04/2022, 6:40 PM
Feedback is welcome because we’re working on the Prefect 2.0 version. What would you rename it to?
e

Edmondo Porcu

05/04/2022, 6:41 PM
good question. I think you probably want to have a factory method:
Copy code
def submit_run(apiVersion:str) -> Task
Can you help me with this ? https://docs.prefect.io/api/latest/tasks/databricks.html#databrickssubmitmultitaskrun I don't understand how the example fits with what we discussed ysterday, sharing a part of the configuration between two tasks
k

Kevin Kho

05/04/2022, 6:44 PM
It’s just the same. You can use that large init, and then during runtime, override stuff or add more keyword arguments
e

Edmondo Porcu

05/04/2022, 6:47 PM
but I still have the problem, should I define the function where I create the instance as a task?
k

Kevin Kho

05/04/2022, 6:50 PM
Ah yes your function can return an initialized task
e

Edmondo Porcu

05/04/2022, 7:08 PM
that's what I was doing before, ugh
but instead all the "non params" should be added within the with Flow(...) is that right?
k

Kevin Kho

05/04/2022, 7:10 PM
No. You can define them outside
Copy code
mytask = MyTask(a=1,b=2)
with Flow(...) as flow:
    mytask(a=3,b=4,c=3)
The run here will override a and b
or
Copy code
with Flow(...) as flow:
    mytask(c=3) # inherits a and b
e

Edmondo Porcu

05/04/2022, 8:27 PM
let me try agian 😄
there is something missing in that API
k

Kevin Kho

05/04/2022, 8:33 PM
What is missing?
e

Edmondo Porcu

05/04/2022, 8:33 PM
the shared cluster part
no BS, y fault
my fault
@Kevin Kho this is not working for me 😞
k

Kevin Kho

05/04/2022, 9:15 PM
What is your error?
e

Edmondo Porcu

05/04/2022, 9:15 PM
Copy code
job1 = my_factory_method1(spark_version, github_repo, git_ref)
    job2 = my_factory_method2(spark_version, github_repo, git_ref)
    job1(databricks_conn_secret=databricks_connstring)
    job2(databricks_conn_secret=databricks_connstring)
so my_factory_method_1 / my_factory_method_2 both return a task
Copy code
File "runner.py", line 122, in <module>
    job1(databricks_conn_secret=databricks_connstring)
  File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 662, in __call__
    new.bind(
  File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 703, in bind
    callargs = dict(signature.bind(*args, **kwargs).arguments)  # type: Dict
  File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 3039, in bind
    return self._bind(args, kwargs)
  File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 2954, in _bind
    raise TypeError(msg) from None
TypeError: missing a required argument: 'spark_version'
k

Kevin Kho

05/04/2022, 9:16 PM
can i see the factory?
e

Edmondo Porcu

05/04/2022, 9:27 PM
sure
Copy code
def my_mfactory_method(param1,param2,param3,param4)
 return new DatabricksSubmitMultiTask
@task
def my_factory_method_1(param3,param4):
return my_factory_method('fixed', 'fixed2', param3, param4)
k

Kevin Kho

05/04/2022, 9:28 PM
Are param defined during runtime or registration time?
You can’t have a task that returns this other task
Copy code
@task
def abc():
    return MyTask()
will not work because the MyTask needs to be instantiated during the registration time but the task is executed during runtime. you can do:
Copy code
@task
def abc(param1):
    return MyTask(__init__stuff_here).run(run_stuff_here, param1)
and then:
Copy code
with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
but then
MyTask
is not a task anymore. It just runs the Python code under the hood.
abc()
will be the task in your flow
e

Edmondo Porcu

05/04/2022, 9:35 PM
Copy code
def suite_job(spark_version, github_repo, git_ref, name, notebooks):
    return DatabricksSubmitMultitaskRun(
        run_name=f"${github_repo} - Integration tests - ${git_ref}",
        tasks=tasks

    )

@task
def suite_1_job(spark_version, github_repo, git_ref):
    notebooks = ...
    return suite_job(spark_version, github_repo, git_ref, "Test Suite 1 - Spark UDF", notebooks=notebooks)


@task
def suite_2_job(spark_version,github_repo, git_ref):
    notebooks = ... some code
    return suite_job(
        spark_version=spark_version,
        github_repo=github_repo,
        git_ref=git_ref,
        name = "Test Suite 2 - MLFlow Integration",
        notebooks=notebooks
    )



with Flow("hello-flow") as flow:
    # An optional parameter "people", with a default list of names
    spark_version = Parameter("spark_version")
    github_repo = Parameter("github_repo")
    git_ref = Parameter("git_ref")
    databricks_connstring = {
        'host': os.environ['DATABRICKS_HOST'],
        'token': os.environ['DATABRICKS_TOKEN']
    }

    job1 = suite_1_job(spark_version, github_repo, git_ref)
    job2 = suite_2_job(spark_version, github_repo, git_ref)
    job1(databricks_conn_secret=databricks_connstring)
    job2(databricks_conn_secret=databricks_connstring)
k

Kevin Kho

05/04/2022, 9:37 PM
This still won’t work right? You just have a task that calls a function that returns another Task but the task is still returning a Task
The main problem is you’re using a Task to initialize another task
e

Edmondo Porcu

05/04/2022, 9:53 PM
yes, but I need to do it to have params right?
k

Kevin Kho

05/04/2022, 10:00 PM
You can’t initialize with Prefect params because they exist at run time but the init of a task needs to happen at build time. You can either do:
Copy code
def func():
    return MyTask(__init__stuff_here)

@task
def abc(param1):
    return func().run(run_stuff_here, param1)

with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
and calling
run
will run the underlying task or:
Copy code
with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    MyTask()(param1)
where you don’t initialize anything and just pass everything to the
run
since the params are available there. Then
MyTask()
can be your wrapper around the Databricks task to add some logic and call functions
e

Edmondo Porcu

05/04/2022, 10:01 PM
I am confused MyTask()
k

Kevin Kho

05/04/2022, 10:02 PM
Could you run try the first way then by explicitly calling
.run()
, just replace
MyTask
with the
DatabricksSubmitMultitaskRun
MyTask
can be any task based on the Prefect Task
e

Edmondo Porcu

05/04/2022, 10:25 PM
sorry, I am confused. What is MyTask() ? my function is called abc, and the other is called func()
k

Kevin Kho

05/04/2022, 10:28 PM
MyTask is the
DatabricksSubmitMultitaskRun
but the setup will work with all tasks
e

Edmondo Porcu

05/05/2022, 12:01 AM
How is abc used? It seems it is not usd
and func? I find this very confusing. I am an experienced programmer and I can't write dry code with Prefect
Can you maybe point me to documentation which can clarify this? Thanks a lot for your help
k

Kevin Kho

05/05/2022, 1:19 AM
There is no doc that clarified this. It’s used in the flow. Ignore the second snippet just look at this:
Copy code
def func():
    return MyTask(__init__stuff_here)

@task
def abc(param1):
    return func().run(run_stuff_here, param1)

with Flow(...) as flow:
    param1 = Parameter("param1", ...)
    abc(param1)
abc
is used in the flow and
func
is used by
abc
e

Edmondo Porcu

05/05/2022, 1:20 AM
I know got it working. The important part is that I use the .run within the @task annotated
k

Kevin Kho

05/05/2022, 1:21 AM
Yes! Nice work!
12 Views