Hi, is there a way to get a Databricks job/run res...
# ask-community
l
Hi, is there a way to get a Databricks job/run result in Prefect? My goal is to run a Databricks task and then get the result and put as parameter to the next task (dbutils.notebook.exit). Wondering if thats possible
a
I think the easiest way would be to use the DatabricksSubmitRun task and within your Databricks notebook you can call another flow and while doing that you can pass some parameter value (e.g. a path to an S3 object) to your child flow. Here is how it could work:
Copy code
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.databricks import DatabricksSubmitRun

notebook_run = DatabricksSubmitRun(json={
    "existing_cluster_id": "1007-999999-groom864", "spark_version": "8.1.x-scala2.12",
    'notebook_task': {
    'notebook_path': '/Users/anna@prefect.io/demo-notebook',
    },
})

with Flow('databricks') as flow:
    conn = PrefectSecret('DATABRICKS_CONNECTION_STRING')
    notebook_run(databricks_conn_secret=conn)
The above databricks notebooks calls at the very end:
Copy code
import requests

create_mutation = """
mutation($input: createFlowRunInput!){
    createFlowRun(input: $input){
        flow_run{
            id
        }
    }
}
"""

inputs = dict(
    versionGroupId="339c86be-5c1c-48f0-b8d3-fe57654afe22", parameters=dict(x=6)
)
response = <http://requests.post|requests.post>(
    url="<https://api.prefect.io>",
    json=dict(query=create_mutation, variables=dict(input=inputs)),
    headers=dict(authorization=f"Bearer {API_KEY}"),
)
print(response.status_code)
print(response.text)
Note that this parameter could be e.g. a string with S3 object reference:
Copy code
parameters=dict(x=6)
upvote 1
l
Thanks for the reply! That might work, but seems like some effort hahaha Will try!
👍 1
a
linking @alex in case he has a better idea - he knows more about Databricks
a
An alternative approach would be to use the
DatabricksRunNow
task. This task waits for the job run to complete and then returns the run ID. You could pass that to a task that queries the Databricks API to get the job run results: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsGetOutput which can then be used later on in your flow. Having a task to retrieve job run results from Databricks would be very useful to have in the Prefect Task library, so if it works well for you then please feel free to submit a PR to add it to the task library!
upvote 2
l
Thank you!
m
a good improvement to
DatabricksRunNow
and
DatabricksSubmitRun
would be to return some attributes along with the run_id
For example something I’d like to do is to get the URL or this output into a flow Artifact so it’s easier for someone to troubleshoot a failed Databricks job
Copy code
INFO - prefect.DatabricksSubmitRun | View run status, Spark UI, and logs at https://<account>.<http://cloud.databricks.com/?o=12345#job/42954/run/96628|cloud.databricks.com/?o=12345#job/42954/run/96628>
a
@Maikel Penz I agree that this would be useful. If this is a feature request, could you open a GitHub issue? Otherwise your idea will get lost in a Slack overflow 😉
l
So, to make this improvement, I've actually built a new "DatabricksRun" without using @task to make it an "operator/task", it uses the "run_id" mentioned by @Maikel Penz
Copy code
def get_run_status(self):
        """
        Get run status in Databricks for a specific run/job id.

        Raises:
            Exception: Error in job by result state.

        Returns:
            self [DatabricksSubmitRun]
        """

        url = f"{self.__private_host}api/2.0/jobs/runs/get"
        status_payload = json.dumps({
            "run_id": self.run_id
        })
        self.state = None
        while self.state==None:
            status_response = requests.request("GET",
                                               url=url,
                                               headers=self.__private_headers,
                                               data=status_payload)
            status_resp = json.loads(status_response.text)
            print(f"DatabricksSubmitRun in run state: {status_resp['state']}")
            print(f"View run status, Spark UI, and logs at {status_resp['run_page_url']}")
            try:
                self.state=status_resp['state']['result_state']
                if self.state != 'SUCCESS':
                    raise Exception(f"Error in job - Result State: {self.state}")
                else:
                    print(f"Result State: {self.state}")
            except KeyError:
                print('Sleeping for 10 seconds.')
                sleep(10)
                continue

        return self

    def submit_run(self, payload:str):
        """
        Submit run to Databricks host with a data payload.

        Args:
            payload (str): May be a string formatted json.

        Raises:
            Exception: If status code is not 200 (Success).

        Returns:
            self [DatabricksSubmitRun]
        """

        url = f"{self.__private_host}api/2.0/jobs/runs/submit"
        response = requests.request("POST",
                                    url=url,
                                    headers=self.__private_headers,
                                    data=payload)

        self.run_id = json.loads(response.text)['run_id']
        print(f'Run id is: {self.run_id}')

        if response.status_code==200:
            self.get_run_status()
        else:
            raise Exception(f"Status code is {response.status_code}.")

        return self

    def cancel_run(self):
        """
        Cancel a Databricks job/run by run_id.
        """

        url = f"{self.__private_host}api/2.0/jobs/runs/cancel"
        payload = json.dumps({
            "run_id": self.run_id
        })
        requests.request("POST",
                         url=url,
                         headers=self.__private_headers,
                         data=payload)
Code is a bit too big due to the checks with the run_id and the "new feature" called "cancel_run", which I can pass to a state_handler whenever Prefect cancels the Flow/Task and then cancel the Databricks job/run.
m
Thanks @Anna Geller. Just created a request here 🙂. The way we are tackling missing integrations at the moment is to put them inside our own library (on top of prefect). Once they are available in Prefect then we use the Prefect library
@Lucas Hosoya that’s pretty cool. FYI on your
submit_run
task you can take advantage of these integrations between DB and Prefect. For the one that gets the result/status I’ve been using this library
l
Yeah, I'm also using the databricks-api due to the result/status, the run_id is really important to keep track on the flow lifecycle.