Lucas Hosoya
12/22/2021, 9:27 PMAnna Geller
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.databricks import DatabricksSubmitRun
notebook_run = DatabricksSubmitRun(json={
"existing_cluster_id": "1007-999999-groom864", "spark_version": "8.1.x-scala2.12",
'notebook_task': {
'notebook_path': '/Users/anna@prefect.io/demo-notebook',
},
})
with Flow('databricks') as flow:
conn = PrefectSecret('DATABRICKS_CONNECTION_STRING')
notebook_run(databricks_conn_secret=conn)
The above databricks notebooks calls at the very end:
import requests
create_mutation = """
mutation($input: createFlowRunInput!){
createFlowRun(input: $input){
flow_run{
id
}
}
}
"""
inputs = dict(
versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = <http://requests.post|requests.post>(
url="<https://api.prefect.io>",
json=dict(query=create_mutation, variables=dict(input=inputs)),
headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
Note that this parameter could be e.g. a string with S3 object reference:
parameters=dict(x=6)
Lucas Hosoya
12/22/2021, 9:42 PMAnna Geller
alex
12/22/2021, 10:11 PMDatabricksRunNow
task. This task waits for the job run to complete and then returns the run ID. You could pass that to a task that queries the Databricks API to get the job run results: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsGetOutput which can then be used later on in your flow. Having a task to retrieve job run results from Databricks would be very useful to have in the Prefect Task library, so if it works well for you then please feel free to submit a PR to add it to the task library!Lucas Hosoya
01/05/2022, 12:25 PMMaikel Penz
02/06/2022, 12:48 AMDatabricksRunNow
and DatabricksSubmitRun
would be to return some attributes along with the run_idMaikel Penz
02/06/2022, 12:51 AMINFO - prefect.DatabricksSubmitRun | View run status, Spark UI, and logs at https://<account>.<http://cloud.databricks.com/?o=12345#job/42954/run/96628|cloud.databricks.com/?o=12345#job/42954/run/96628>
Anna Geller
Lucas Hosoya
02/09/2022, 1:54 PMdef get_run_status(self):
"""
Get run status in Databricks for a specific run/job id.
Raises:
Exception: Error in job by result state.
Returns:
self [DatabricksSubmitRun]
"""
url = f"{self.__private_host}api/2.0/jobs/runs/get"
status_payload = json.dumps({
"run_id": self.run_id
})
self.state = None
while self.state==None:
status_response = requests.request("GET",
url=url,
headers=self.__private_headers,
data=status_payload)
status_resp = json.loads(status_response.text)
print(f"DatabricksSubmitRun in run state: {status_resp['state']}")
print(f"View run status, Spark UI, and logs at {status_resp['run_page_url']}")
try:
self.state=status_resp['state']['result_state']
if self.state != 'SUCCESS':
raise Exception(f"Error in job - Result State: {self.state}")
else:
print(f"Result State: {self.state}")
except KeyError:
print('Sleeping for 10 seconds.')
sleep(10)
continue
return self
def submit_run(self, payload:str):
"""
Submit run to Databricks host with a data payload.
Args:
payload (str): May be a string formatted json.
Raises:
Exception: If status code is not 200 (Success).
Returns:
self [DatabricksSubmitRun]
"""
url = f"{self.__private_host}api/2.0/jobs/runs/submit"
response = requests.request("POST",
url=url,
headers=self.__private_headers,
data=payload)
self.run_id = json.loads(response.text)['run_id']
print(f'Run id is: {self.run_id}')
if response.status_code==200:
self.get_run_status()
else:
raise Exception(f"Status code is {response.status_code}.")
return self
def cancel_run(self):
"""
Cancel a Databricks job/run by run_id.
"""
url = f"{self.__private_host}api/2.0/jobs/runs/cancel"
payload = json.dumps({
"run_id": self.run_id
})
requests.request("POST",
url=url,
headers=self.__private_headers,
data=payload)
Lucas Hosoya
02/09/2022, 1:56 PMMaikel Penz
02/09/2022, 7:51 PMMaikel Penz
02/09/2022, 7:53 PMsubmit_run
task you can take advantage of these integrations between DB and Prefect.
For the one that gets the result/status I’ve been using this libraryLucas Hosoya
02/10/2022, 3:04 PM