https://prefect.io logo
Title
t

tas

05/11/2022, 8:10 AM
Hi team, I have just started exploring prefect 2.0 and loving it so far. I am still trying to get my head around the concepts but getting there šŸ™‚. I had a question around launching a pyspark job on GCP serverless dataproc. I know that you need an agent to subscribe to the queue in order to pull the job but in this case do I spin up CE and start an agent process that then will launch a dataproc serverless job? Or do I need a custom task runner that does that for me? It doesn't make sense to me as again I have not got my head entirely around the concepts
šŸŽ‰ 1
a

Anna Geller

05/11/2022, 11:17 AM
So great to hear you love Prefect 2.0! You don't need to build any custom task runner to launch such jobs, you can do it directly from your task. From here it seems GCP has a nice Python API to submit such jobs from Python - you could just wrap it into a task and call it in your flow:
from prefect import task, flow
import re

from google.cloud import dataproc_v1
from google.cloud import storage


@task
def submit_dataproc_job(region, cluster_name, gcs_bucket, spark_filename, project_id):
    # Create the job client.
    job_client = dataproc_v1.JobControllerClient(
        client_options={"api_endpoint": "{}-<http://dataproc.googleapis.com:443%22.format(region)|dataproc.googleapis.com:443".format(region)>}
    )

    # Create the job config.
    job = {
        "placement": {"cluster_name": cluster_name},
        "pyspark_job": {
            "main_python_file_uri": "gs://{}/{}".format(gcs_bucket, spark_filename)
        },
    }

    operation = job_client.submit_job_as_operation(
        request={"project_id": project_id, "region": region, "job": job}
    )
    response = operation.result()

    # Dataproc job output is saved to the Cloud Storage bucket
    # allocated to the job. Use regex to obtain the bucket and blob info.
    matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

    output = (
        storage.Client()
        .get_bucket(matches.group(1))
        .blob(f"{matches.group(2)}.000000000")
        .download_as_string()
    )

    print(f"Job finished successfully: {output}\r\n")


@flow
def gcp_flow(your_args):
    submit_dataproc_job(your_args)


if __name__ == "__main__":
    gcp_flow(your_args)
cc @Andrew Huang this could be a potentially cool addition to your GCP collection
šŸ‘€ 1
t

tas

05/17/2022, 2:56 PM
Sorry for the late reply but was not feeling well. Thanks for your response @Anna Geller So I have a bunch of pyspark jobs that I need to orchestrate and are currently running on a dataproc cluster What I am still unsure of is where will my prefect agent be running ? What flow runner do I use in this scenario? Also still confused with the task runner. What option do I pick? Also more generally speaking what is the difference between the task runner and the flow runner ? So if I use a Docker flow runner and a dask task runner how will the infra look like. Sorry but I am very confused šŸ¤·ā€ā™‚ļø
a

Anna Geller

05/17/2022, 6:58 PM
ā€¢ task runner runs your tasks, e.g. submits them to Dask or Ray or to async event loop ā€¢ flow runner runs your flow incl. infrastructure setup sorry, don't know much about Dataproc, I'd recommend checking their REST API to trigger such tasks cc @Andrew Huang who maintains GCP Collection
šŸ‘ 1