Bryan
09/23/2024, 7:55 PMMarvin
09/23/2024, 7:55 PMpython
from prefect import flow
from prefect_databricks import DatabricksCredentials
from <http://prefect_databricks.jobs|prefect_databricks.jobs> import jobs_runs_submit
from <http://prefect_databricks.models.jobs|prefect_databricks.models.jobs> import (
AutoScale, AwsAttributes, JobTaskSettings, NotebookTask, NewCluster
)
@flow
def jobs_runs_submit_flow(notebook_path, **base_parameters):
databricks_credentials = DatabricksCredentials.load("my-block")
# Specify new cluster settings
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
# Specify notebook to use and any parameters to pass
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
# Compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
# Run the job
run = jobs_runs_submit(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return run
# Example usage
jobs_runs_submit_flow(notebook_path="/path/to/your/notebook.ipynb", your_param="value")
### Prefect 3.x Example
python
from prefect import flow
from prefect_databricks import DatabricksCredentials
from <http://prefect_databricks.jobs|prefect_databricks.jobs> import jobs_runs_submit
from <http://prefect_databricks.models.jobs|prefect_databricks.models.jobs> import (
AutoScale, AwsAttributes, JobTaskSettings, NotebookTask, NewCluster
)
@flow
def jobs_runs_submit_flow(notebook_path, **base_parameters):
databricks_credentials = DatabricksCredentials.load("my-block")
# Specify new cluster settings
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
# Specify notebook and any parameters to pass
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
# Compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
# Run the job
run = jobs_runs_submit(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return run
# Example usage
jobs_runs_submit_flow(notebook_path="/path/to/your/notebook.ipynb", your_param="value")
These snippets include setting up a Databricks job, specifying cluster details, and running a specified Jupyter notebook within a Prefect flow.
For additional information, you can refer to the documentation directly:
- Prefect 2.x: Prefect Databricks Integration
- Prefect 3.x: Prefect Databricks IntegrationBryan
09/23/2024, 7:56 PMMarvin
09/23/2024, 7:56 PMpapermill
library, which allows you to programmatically execute notebooks. Below is an example for both Prefect 2.x and 3.x:
### Prefect 2.x Example
First, install the required library:
sh
pip install papermill
Then, create a Prefect flow:
python
from prefect import flow, task
import papermill as pm
@task
def run_notebook(input_path: str, output_path: str, parameters: dict):
pm.execute_notebook(
input_path=input_path,
output_path=output_path,
parameters=parameters
)
return output_path
@flow
def execute_notebook_flow(input_path: str, output_path: str, parameters: dict):
notebook_output = run_notebook(input_path, output_path, parameters)
return notebook_output
# Example usage
execute_notebook_flow(
input_path="path/to/your/notebook.ipynb",
output_path="path/to/output/notebook_output.ipynb",
parameters={"param1": 42, "param2": "hello"}
)
### Prefect 3.x Example
Similar to 2.x, install the required library:
sh
pip install papermill
Then, create a Prefect flow:
python
from prefect import flow, task
import papermill as pm
@task
def run_notebook(input_path: str, output_path: str, parameters: dict):
pm.execute_notebook(
input_path=input_path,
output_path=output_path,
parameters=parameters
)
return output_path
@flow
def execute_notebook_flow(input_path: str, output_path: str, parameters: dict):
notebook_output = run_notebook(input_path, output_path, parameters)
return notebook_output
# Example usage
execute_notebook_flow(
input_path="path/to/your/notebook.ipynb",
output_path="path/to/output/notebook_output.ipynb",
parameters={"param1": 42, "param2": "hello"}
)
In this setup, the papermill
library is used to execute Jupyter notebooks within a Prefect flow. Simply specify the input and output paths of the notebook along with any parameters required by the notebook.
For further details, the papermill
documentation can be a helpful resource: https://papermill.readthedocs.io/en/latest/