```from prefect import task, Flow from prefect.tas...
# ask-community
l
Copy code
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.databricks import DatabricksRunNow
import requests


conncfg = {"host":"<https://xxx.cloud.databricks.com/>", "token":"xxx"}


job_json = {
      "job_id": 24242,
      "notebook_params": {}
    }


@task
def run_databricks(job_config):
    notebook_run = DatabricksRunNow(json=job_config).run()
    run_id = notebook_run(databricks_conn_secret=conncfg)
    headers = { "Authorization" : "Bearer xxx" }
    r = requests.get('<https://xxx.cloud.databricks.com/api/2.0/jobs/runs/get-output?run_id=%s'%(runid)>, headers=headers)
    results = r.json()
    output =  results['metadata']['notebook_output']
    if output.has_key('message'):
        raise Exception(output['message'])



with Flow('testflow') as flow:
    run_databricks(job_json)
k
Hi! WIll look into this.
l
cool - thx
k
I’m not 100% sure this will work but can you try moving the job_config into the run?
DatabricksRunNow().run(json=job_config)
l
lol - ya I tried that
but Ill try again
ya same error
k
Ok I’ll try to get a working example on my end
You got this task working on it’s on right? Did you supply a connection string when you ran it apart from the json?
l
ya - I was kicking off that job without issues when I included the DatabricksRun task within the flow
I did supply a connection string in that case
I can send over an example that works if that helps
(sans secrets)
k
I think your error is just asking for that connection string when you use the DatabricksRunNow task right?
l
ya thats what the error thats showing is
I can try hardcoding that in the run_databricks function - maybe that helps
k
Ahh can you insert that databricks_conn_secret inside
.run()
?
l
ah
k
Copy code
DatabricksRunNow(json=job_config).run(databricks_conn_secret=conncfg)
l
ok ill try that
youre a genius!
I think that did it - thx agagin
k
Glad you got it working. Just a small syntax issue 🙂. Happy to help.
Just so you understand it, the
Flow()
block calls the
run
method of each task. If you passed it to the
Task
inside the
Flow
previously, you were passing it to the
run
method under the hood.
l
makes sense - I'm still getting my head around tasks/flows/etc . . .
the documentation helps