tas
05/11/2022, 8:10 AMAnna Geller
from prefect import task, flow
import re
from google.cloud import dataproc_v1
from google.cloud import storage
@task
def submit_dataproc_job(region, cluster_name, gcs_bucket, spark_filename, project_id):
# Create the job client.
job_client = dataproc_v1.JobControllerClient(
client_options={"api_endpoint": "{}-<http://dataproc.googleapis.com:443%22.format(region)|dataproc.googleapis.com:443".format(region)>}
)
# Create the job config.
job = {
"placement": {"cluster_name": cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://{}/{}".format(gcs_bucket, spark_filename)
},
}
operation = job_client.submit_job_as_operation(
request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
output = (
storage.Client()
.get_bucket(matches.group(1))
.blob(f"{matches.group(2)}.000000000")
.download_as_string()
)
print(f"Job finished successfully: {output}\r\n")
@flow
def gcp_flow(your_args):
submit_dataproc_job(your_args)
if __name__ == "__main__":
gcp_flow(your_args)
tas
05/17/2022, 2:56 PMAnna Geller