Lone Pine Account
05/14/2021, 5:14 PMfrom prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.databricks import DatabricksRunNow
import requests
conncfg = {"host":"<https://xxx.cloud.databricks.com/>", "token":"xxx"}
job_json = {
"job_id": 24242,
"notebook_params": {}
}
@task
def run_databricks(job_config):
notebook_run = DatabricksRunNow(json=job_config).run()
run_id = notebook_run(databricks_conn_secret=conncfg)
headers = { "Authorization" : "Bearer xxx" }
r = requests.get('<https://xxx.cloud.databricks.com/api/2.0/jobs/runs/get-output?run_id=%s'%(runid)>, headers=headers)
results = r.json()
output = results['metadata']['notebook_output']
if output.has_key('message'):
raise Exception(output['message'])
with Flow('testflow') as flow:
run_databricks(job_json)
Kevin Kho
Lone Pine Account
05/14/2021, 5:19 PMKevin Kho
DatabricksRunNow().run(json=job_config)
Lone Pine Account
05/14/2021, 5:22 PMLone Pine Account
05/14/2021, 5:22 PMLone Pine Account
05/14/2021, 5:23 PMKevin Kho
Kevin Kho
Lone Pine Account
05/14/2021, 5:30 PMLone Pine Account
05/14/2021, 5:30 PMLone Pine Account
05/14/2021, 5:31 PMLone Pine Account
05/14/2021, 5:31 PMKevin Kho
Lone Pine Account
05/14/2021, 5:31 PMLone Pine Account
05/14/2021, 5:32 PMKevin Kho
.run()
?Lone Pine Account
05/14/2021, 5:32 PMKevin Kho
DatabricksRunNow(json=job_config).run(databricks_conn_secret=conncfg)
Lone Pine Account
05/14/2021, 5:32 PMLone Pine Account
05/14/2021, 5:36 PMLone Pine Account
05/14/2021, 5:36 PMKevin Kho
Kevin Kho
Flow()
block calls the run
method of each task. If you passed it to the Task
inside the Flow
previously, you were passing it to the run
method under the hood.Lone Pine Account
05/14/2021, 5:41 PMLone Pine Account
05/14/2021, 5:41 PM