Edmondo Porcu
05/04/2022, 5:49 PMKevin Kho
05/04/2022, 5:52 PMEdmondo Porcu
05/04/2022, 6:22 PMKevin Kho
05/04/2022, 6:37 PMEdmondo Porcu
05/04/2022, 6:39 PMDatabricksSubmitMultitaskRun
is v2.1 apiKevin Kho
05/04/2022, 6:40 PMEdmondo Porcu
05/04/2022, 6:41 PMdef submit_run(apiVersion:str) -> Task
Kevin Kho
05/04/2022, 6:44 PMEdmondo Porcu
05/04/2022, 6:47 PMKevin Kho
05/04/2022, 6:50 PMEdmondo Porcu
05/04/2022, 7:08 PMKevin Kho
05/04/2022, 7:10 PMmytask = MyTask(a=1,b=2)
with Flow(...) as flow:
mytask(a=3,b=4,c=3)
The run here will override a and bwith Flow(...) as flow:
mytask(c=3) # inherits a and b
Edmondo Porcu
05/04/2022, 8:27 PMKevin Kho
05/04/2022, 8:33 PMEdmondo Porcu
05/04/2022, 8:33 PMKevin Kho
05/04/2022, 9:15 PMEdmondo Porcu
05/04/2022, 9:15 PMjob1 = my_factory_method1(spark_version, github_repo, git_ref)
job2 = my_factory_method2(spark_version, github_repo, git_ref)
job1(databricks_conn_secret=databricks_connstring)
job2(databricks_conn_secret=databricks_connstring)
File "runner.py", line 122, in <module>
job1(databricks_conn_secret=databricks_connstring)
File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 662, in __call__
new.bind(
File "/Users/edmondoporcu/Library/Caches/pypoetry/virtualenvs/sapbio-databricks-pylib-template-6mSIgEMS-py3.8/lib/python3.8/site-packages/prefect/core/task.py", line 703, in bind
callargs = dict(signature.bind(*args, **kwargs).arguments) # type: Dict
File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 3039, in bind
return self._bind(args, kwargs)
File "/Users/edmondoporcu/.pyenv/versions/3.8.13/lib/python3.8/inspect.py", line 2954, in _bind
raise TypeError(msg) from None
TypeError: missing a required argument: 'spark_version'
Kevin Kho
05/04/2022, 9:16 PMEdmondo Porcu
05/04/2022, 9:27 PMdef my_mfactory_method(param1,param2,param3,param4)
return new DatabricksSubmitMultiTask
@task
def my_factory_method_1(param3,param4):
return my_factory_method('fixed', 'fixed2', param3, param4)
Kevin Kho
05/04/2022, 9:28 PM@task
def abc():
return MyTask()
will not work because the MyTask needs to be instantiated during the registration time but the task is executed during runtime.
you can do:
@task
def abc(param1):
return MyTask(__init__stuff_here).run(run_stuff_here, param1)
and then:
with Flow(...) as flow:
param1 = Parameter("param1", ...)
abc(param1)
but then MyTask
is not a task anymore. It just runs the Python code under the hood. abc()
will be the task in your flowEdmondo Porcu
05/04/2022, 9:35 PMdef suite_job(spark_version, github_repo, git_ref, name, notebooks):
return DatabricksSubmitMultitaskRun(
run_name=f"${github_repo} - Integration tests - ${git_ref}",
tasks=tasks
)
@task
def suite_1_job(spark_version, github_repo, git_ref):
notebooks = ...
return suite_job(spark_version, github_repo, git_ref, "Test Suite 1 - Spark UDF", notebooks=notebooks)
@task
def suite_2_job(spark_version,github_repo, git_ref):
notebooks = ... some code
return suite_job(
spark_version=spark_version,
github_repo=github_repo,
git_ref=git_ref,
name = "Test Suite 2 - MLFlow Integration",
notebooks=notebooks
)
with Flow("hello-flow") as flow:
# An optional parameter "people", with a default list of names
spark_version = Parameter("spark_version")
github_repo = Parameter("github_repo")
git_ref = Parameter("git_ref")
databricks_connstring = {
'host': os.environ['DATABRICKS_HOST'],
'token': os.environ['DATABRICKS_TOKEN']
}
job1 = suite_1_job(spark_version, github_repo, git_ref)
job2 = suite_2_job(spark_version, github_repo, git_ref)
job1(databricks_conn_secret=databricks_connstring)
job2(databricks_conn_secret=databricks_connstring)
Kevin Kho
05/04/2022, 9:37 PMEdmondo Porcu
05/04/2022, 9:53 PMKevin Kho
05/04/2022, 10:00 PMdef func():
return MyTask(__init__stuff_here)
@task
def abc(param1):
return func().run(run_stuff_here, param1)
with Flow(...) as flow:
param1 = Parameter("param1", ...)
abc(param1)
and calling run
will run the underlying task or:
with Flow(...) as flow:
param1 = Parameter("param1", ...)
MyTask()(param1)
where you don’t initialize anything and just pass everything to the run
since the params are available there.
Then MyTask()
can be your wrapper around the Databricks task to add some logic and call functionsEdmondo Porcu
05/04/2022, 10:01 PMKevin Kho
05/04/2022, 10:02 PM.run()
, just replace MyTask
with the DatabricksSubmitMultitaskRun
MyTask
can be any task based on the Prefect TaskEdmondo Porcu
05/04/2022, 10:25 PMKevin Kho
05/04/2022, 10:28 PMDatabricksSubmitMultitaskRun
but the setup will work with all tasksEdmondo Porcu
05/05/2022, 12:01 AMKevin Kho
05/05/2022, 1:19 AMdef func():
return MyTask(__init__stuff_here)
@task
def abc(param1):
return func().run(run_stuff_here, param1)
with Flow(...) as flow:
param1 = Parameter("param1", ...)
abc(param1)
abc
is used in the flow and func
is used by abc
Edmondo Porcu
05/05/2022, 1:20 AMKevin Kho
05/05/2022, 1:21 AM