Riley Hun
08/04/2020, 5:57 PMJim Crist-Harif
08/04/2020, 6:14 PMRiley Hun
08/04/2020, 6:20 PMclass CloudLogging(Task):
def run(
self,
log_name: str = 'etl_logging',
name: str = None,
level=<http://logging.INFO|logging.INFO>
):
"""
:param: log_name (str) = name of logging instance
:param: name (str) = name of log event
:param: level (str) = logging level (INFO, DEBUG, ERROR, WARNING):
:raises:
- ValueError: if all required arguments haven't been provided
:return: logging instance for sending logs to StackDriver
"""
if name is None:
name = self.__class__.__name__
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, log_name)
cloud_logger = logging.getLogger(name)
cloud_logger.setLevel(level)
cloud_logger.addHandler(lg_handler)
return cloud_logger
And then I pass in the logger instance to each of the subsequent tasks in the flow like so:
with Flow("Thinknum ETL") as flow:
version = Parameter("version", default="20151130")
bucket_name = Parameter("bucket_name", default="alternative_data")
dataset_id = Parameter("dataset_id", default="job_listings")
client_id = EnvVarSecret("client_id")
client_secret = EnvVarSecret("client_secret")
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
cloud_logging_task = CloudLogging()
logger = cloud_logging_task(
log_name='Thinknum_ETL_Logging'
)
token_task = ThinkNumGetTokenTask(
logger=logger
)
token = token_task(
version=version,
client_id=client_id,
client_secret=client_secret
)
Am I doing this correctly because it's not logging anything to stackdriver. I'm running my flow locally for testing but it's using a static Dask Cluster on GKE as the executor.Jim Crist-Harif
08/04/2020, 6:24 PMRiley Hun
08/04/2020, 6:25 PMJim Crist-Harif
08/04/2020, 6:31 PMdef ensure_logging():
logger = prefect.utilities.logging.get_logger()
if your_logger_isnt_in(logger.handlers):
setup_logging
@task
def your_task():
ensure_logging()
...
Riley Hun
08/04/2020, 6:35 PMJim Crist-Harif
08/04/2020, 6:42 PMRiley Hun
08/04/2020, 7:39 PMos.environ['GOOOGLE_APPLICATION_CREDENTIALS'] = 'key.json'
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
But I'm not sure it's doing anything. It would be better to have the json service account key to pass into a task, so I can call these functions:
storage.Client.from_service_account_json()
I'm also getting these warnings:
`/Users/rihun/anaconda3/envs/thinknum_etl/lib/python3.7/site-packages/google/auth/_default.py:69: UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. We recommend you rerun gcloud auth application-default login
and make sure a quota project is added. Or you can use service accounts instead. For more information about service accounts, see https://cloud.google.com/docs/authentication/`
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
def ensure_stackdriver_logging():
logger = get_logger()
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
lg_handler.setFormatter(formatter)
logger.setLevel(<http://logging.INFO|logging.INFO>)
logger.addHandler(lg_handler)
Sample Logs from Stackdriver:
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run...
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success'
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded
I 2020-08-04T23:18:13.547597Z 2020-08-04 16:18:13,547 - prefect.Thinknum ETL - INFO - Waiting for next scheduled run at 2020-08-04T23:19:11.705087+00:00
Jim Crist-Harif
08/05/2020, 2:17 PMIdeally, I'm still a little confused about how authentication works in Prefect?Prefect generally relies on `Secret`s to authenticate, either pulling from environment variables, local configuration (in the
.prefect/config.toml
file) or cloud (where they're stored in vault). A secret can contain whatever values you want, so you're free to pass in whatever auth tokens you need.Riley Hun
08/05/2020, 5:20 PMdef ensure_stackdriver_logging():
logger = get_logger()
if hasattr(logger, 'initialized'):
return logger
else:
setattr(logger, 'initialized', True)
lg_client = google.cloud.logging.Client()
lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
lg_handler.setFormatter(formatter)
logger.setLevel(<http://logging.INFO|logging.INFO>)
logger.addHandler(lg_handler)