Hi everyone, Was wondering what would be the best...
# prefect-community
r
Hi everyone, Was wondering what would be the best way to submit a spark job to a DataProc cluster through Prefect? The way I'm doing it right now (even though I haven't yet executed it) is as follows: I have these items baked into the dockerfile for the dask workers: • gcloud command tools • spark-snowflake credentials json file • A copy of the main spark job python file Is this the correct approach?
Then I have a prefect task defined like so:
Copy code
class ThinkNumCallDataProcTask(Task):

    def __init__(
        self,
        dataset_id: str = None,
        date: str = None,
        **kwargs
    ):
        self.dataset_id = dataset_id
        self.date = date
        super().__init__(**kwargs)

    @defaults_from_attrs(
        "dataset_id",
        "date"
    )
    def run(
            self,
            dataset_id,
            date
    ):
        db_name = f'thinknum_{dataset_id}'
        file_name = f'thinknum-{dataset_id}-{date}.csv'.replace('_', '-')
        gcs_input_path = f'<gs://alternative_data/thinknum/{dataset_id}/{date}/{file_name}>'

        subprocess.check_output(
            ['gcloud', 'dataproc', 'jobs', 'submit', 'pyspark',
             '--cluster', 'feature-lib-cluster',
             '--region', 'us-east1',
             '--jars', '<gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,>'
                       '<gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar,>'
                       '<gs://dataproc-featurelib/spark-lib/gcs-connector-hadoop3-2.0.1.jar>',
             '--files', 'key.json,feature_lib_creds.json',
             'py_files/spark/batch_loader.py', '--',
             gcs_input_path, db_name
             ],
            stderr=subprocess.PIPE
        )