https://prefect.io logo
Title
r

Riley Hun

08/04/2020, 5:57 PM
Hi everyone, I'm new to Prefect and I am loving it so far. We will be using it for our production workflows to ingest external data. I have started to build my first dataflow using Prefect, but I have some questions. I feel like I need to demo it though to get some feedback. Is there a Prefect Support Group I can have a VC with to verify a few things? But the items I'm struggling with are: • Integrating GCP Stackdriver logging • It takes forever to push some data to GCS Bucket • Using GCP credentials
j

Jim Crist-Harif

08/04/2020, 6:14 PM
Hi Riley, Slack is the only support we provide for users, if you have a specific question about any of the above we'd be happy to help.
r

Riley Hun

08/04/2020, 6:20 PM
Okay thanks Jim! I'll start first with my inquiry regarding logging to GCP stackdriver logging. I have a task defined like this:
class CloudLogging(Task):

    def run(
        self,
        log_name: str = 'etl_logging',
        name: str = None,
        level=<http://logging.INFO|logging.INFO>
    ):
        """
        :param: log_name (str) = name of logging instance
        :param: name (str) = name of log event
        :param: level (str) = logging level (INFO, DEBUG, ERROR, WARNING):

        :raises:
            - ValueError: if all required arguments haven't been provided

        :return: logging instance for sending logs to StackDriver

        """

        if name is None:
            name = self.__class__.__name__

        lg_client = google.cloud.logging.Client()
        lg_handler = CloudLoggingHandler(lg_client, log_name)
        cloud_logger = logging.getLogger(name)
        cloud_logger.setLevel(level)
        cloud_logger.addHandler(lg_handler)

        return cloud_logger
And then I pass in the logger instance to each of the subsequent tasks in the flow like so:
with Flow("Thinknum ETL") as flow:
    version = Parameter("version", default="20151130")
    bucket_name = Parameter("bucket_name", default="alternative_data")
    dataset_id = Parameter("dataset_id", default="job_listings")
    client_id = EnvVarSecret("client_id")
    client_secret = EnvVarSecret("client_secret")
    gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')

    cloud_logging_task = CloudLogging()
    logger = cloud_logging_task(
        log_name='Thinknum_ETL_Logging'
    )
    token_task = ThinkNumGetTokenTask(
        logger=logger
    )
    token = token_task(
        version=version,
        client_id=client_id,
        client_secret=client_secret
    )
Am I doing this correctly because it's not logging anything to stackdriver. I'm running my flow locally for testing but it's using a static Dask Cluster on GKE as the executor.
j

Jim Crist-Harif

08/04/2020, 6:24 PM
Is your goal for all the logs prefect generates to go to stackdriver, or just logging calls you make yourself within your tasks?
r

Riley Hun

08/04/2020, 6:25 PM
Preferably all logs going to stackdriver
But if not possible, then my logs generated should be sufficient
j

Jim Crist-Harif

08/04/2020, 6:31 PM
Gotcha. You shouldn't configure a logger as part of a task, since tasks will only run on one node. Loggers don't serialize with their handlers attached, so your configuration isn't being used everywhere. Instead, you should configure the logging once everywhere. I don't think prefect currently has a nice way to do this as part of our config (though we could add this). For now, you might add a hook that adds your handler if not already configured, then call this as part of your tasks. Something like (pseudocode)
def ensure_logging():
    logger = prefect.utilities.logging.get_logger()
    if your_logger_isnt_in(logger.handlers):
        setup_logging

@task
def your_task():
    ensure_logging()
    ...
This definitely isn't ideal, but should at least get you going. Depending on how you're starting your dask cluster you might make use of dask's plugins to ensure your handlers are configured on worker startup. But I think we should add a config option to add extra handlers to the prefect logger, will create an issue.
r

Riley Hun

08/04/2020, 6:35 PM
Okay - thanks Jim. Let me give it a go using your pseudo code as a template
j

Jim Crist-Harif

08/04/2020, 6:42 PM
r

Riley Hun

08/04/2020, 7:39 PM
Hi Jim, I can confirm that logging to stackdriver seems to be working just fine now. I will now proceed with my second inquiry which is somewhat related. I'm authenticating w/ GCP using my "end user credentials", but I should probably be using a service account key file. Ideally, I'm still a little confused about how authentication works in Prefect? As it stands, I am doing something like this:
os.environ['GOOOGLE_APPLICATION_CREDENTIALS'] = 'key.json'
gcp_key = EnvVarSecret('GOOGLE_APPLICATION_CREDENTIALS')
But I'm not sure it's doing anything. It would be better to have the json service account key to pass into a task, so I can call these functions:
storage.Client.from_service_account_json()
I'm also getting these warnings: `/Users/rihun/anaconda3/envs/thinknum_etl/lib/python3.7/site-packages/google/auth/_default.py:69: UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. We recommend you rerun
gcloud auth application-default login
and make sure a quota project is added. Or you can use service accounts instead. For more information about service accounts, see https://cloud.google.com/docs/authentication/`
warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
One thing I'm noticed is that the logs are being duplicated in Stackdriver. I implemented your interim solutions like so:
def ensure_stackdriver_logging():
    logger = get_logger()
    lg_client = google.cloud.logging.Client()
    lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    lg_handler.setFormatter(formatter)
    logger.setLevel(<http://logging.INFO|logging.INFO>)
    logger.addHandler(lg_handler)
Sample Logs from Stackdriver:
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.499848Z 2020-08-04 16:18:13,499 - prefect.TaskRunner - INFO - Task 'ListLoadedThinkNumDates': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.529295Z 2020-08-04 16:18:13,529 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': Starting task run... 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.545171Z 2020-08-04 16:18:13,545 - prefect.TaskRunner - INFO - Task 'get_dates_to_load': finished task run for task with final state: 'Success' 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.546556Z 2020-08-04 16:18:13,546 - prefect.FlowRunner - INFO - Flow run SUCCESS: all reference tasks succeeded 
I 2020-08-04T23:18:13.547597Z 2020-08-04 16:18:13,547 - prefect.Thinknum ETL - INFO - Waiting for next scheduled run at 2020-08-04T23:19:11.705087+00:00
j

Jim Crist-Harif

08/05/2020, 2:17 PM
It looks like you're not checking if the handler already exists, so if the task is rerun the handler will be added multiple times.
Ideally, I'm still a little confused about how authentication works in Prefect?
Prefect generally relies on `Secret`s to authenticate, either pulling from environment variables, local configuration (in the
.prefect/config.toml
file) or cloud (where they're stored in vault). A secret can contain whatever values you want, so you're free to pass in whatever auth tokens you need.
r

Riley Hun

08/05/2020, 5:20 PM
So I'm not really sure how do that handler check, but I did do something like this which in similar in idea, but it didn't seem to work:
def ensure_stackdriver_logging():
    logger = get_logger()

    if hasattr(logger, 'initialized'):
        return logger
    else:
        setattr(logger, 'initialized', True)

    lg_client = google.cloud.logging.Client()
    lg_handler = CloudLoggingHandler(lg_client, 'thinknum_etl_logging')
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    lg_handler.setFormatter(formatter)
    logger.setLevel(<http://logging.INFO|logging.INFO>)
    logger.addHandler(lg_handler)
On the subject of secrets/passwords, is it acceptable to have them stored as environment variables in the docker file of the dask workers? I've already secured my Dask cluster on GCP.