Riley Hun
08/05/2020, 4:48 PMclass ThinkNumCallDataProcTask(Task):
def __init__(
self,
dataset_id: str = None,
date: str = None,
**kwargs
):
self.dataset_id = dataset_id
self.date = date
super().__init__(**kwargs)
@defaults_from_attrs(
"dataset_id",
"date"
)
def run(
self,
dataset_id,
date
):
db_name = f'thinknum_{dataset_id}'
file_name = f'thinknum-{dataset_id}-{date}.csv'.replace('_', '-')
gcs_input_path = f'<gs://alternative_data/thinknum/{dataset_id}/{date}/{file_name}>'
subprocess.check_output(
['gcloud', 'dataproc', 'jobs', 'submit', 'pyspark',
'--cluster', 'feature-lib-cluster',
'--region', 'us-east1',
'--jars', '<gs://dataproc-featurelib/spark-lib/snowflake-jdbc-3.8.0.jar,>'
'<gs://dataproc-featurelib/spark-lib/spark-snowflake_2.11-2.4.13-spark_2.4.jar,>'
'<gs://dataproc-featurelib/spark-lib/gcs-connector-hadoop3-2.0.1.jar>',
'--files', 'key.json,feature_lib_creds.json',
'py_files/spark/batch_loader.py', '--',
gcs_input_path, db_name
],
stderr=subprocess.PIPE
)