Carter Kwon

    Carter Kwon

    1 year ago
    Hello, we are having some issues around the logging. Some, but not all of the logs are appearing in the dashboard. Any ideas how I might troubleshoot this? We are using ECS.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Carter Kwon, does this happen all the times or just sometimes? Are you using Dask? Is it the worker logs not appearing but the client logs are? Any insight as to what logs are missing?
    Carter Kwon

    Carter Kwon

    1 year ago
    It happens consistently with this particular flow, but I haven't noticed it on other flows. This flow is just using the default LocalExecutor. I'm not exactly sure what you mean by worker vs. client logs, but it is some tasks print the logs and others don't. Here is the flow run schematic and some flow code for the functions with irrelevant lines removed. The
    get_start_end_time
    task print statements appear in logs, but the other two tasks do not have print statements in logs. Please let me know if I can provide anymore info to help.
    @task(nout=2, log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def get_start_end_time(start_date):
        <task logic>
        print(start_time)
        print(end_time)
        return start_time, end_time
    
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def parse_event_responses(event_response_list):
        <task logic>
        print(df.shape)
        return df
    
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def upload_df_to_s3(bucket, key, date, df):
        # Create the s3 resource
        print("Uploading to S3")
        <task logic>
    Some more background... The upload to s3 task is getting a permission denied error. We've verified that the newest code is pointing to the correct bucket and shouldn't have permission issues. We added some print statements to ensure the newest code is being run, but those print statements are not appearing. It seems like the flow might not be running the newest code. I looked at the specific flow version "DETAILS" sections in the UI and manually pulled the image it's pointing to from ECR. That image contains the newest code (print statements and correct bucket).
    This is what I mean when I say the flow details
    Kevin Kho

    Kevin Kho

    1 year ago
    Gotcha. Instead of print, could you use something like:
    import prefect
    
    @task
    def cde(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("This is the third task")
        <http://logger.info|logger.info>('in cde: ' + str(x))
        time.sleep(2)
        return x+3
    so that it gets sent to the logger?
    Carter Kwon

    Carter Kwon

    1 year ago
    I can try that. Just to make sure I'm clear, the print statements combined with
    log_stdout=True
    should appear in the logs though - right?
    Kevin Kho

    Kevin Kho

    1 year ago
    Actually no I see what you mean. I think they should appear I think what’s happening is that ECS is pulling the wrong image. How did you tag the image?
    Carter Kwon

    Carter Kwon

    1 year ago
    We use the default prefect tagging of timestamp so it looks like it's all correct. Here's a less redacted version. It's also part of a CI/CD process with other flows that seem to be working fine so it would be a little strange that this is the only one with problems.
    I added the logs you mentioned
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the <TASK_NAME> task")
    to the top of each task and pushed it. The same tasks that were logging print statements were showing the new logs, but those other tasks still aren't. This shows the most recent code is being run though since I just added those new statements.
    Kevin Kho

    Kevin Kho

    1 year ago
    Oh I was testing your setup just now but I guess you changed. Just wanna be clear, the tasks are running but and successful but you don’t get logs out of them? This is puzzling, have you tried setting the log level to debug?
    Carter Kwon

    Carter Kwon

    1 year ago
    I added those logger statements to see if those other tasks' logs would show up, but they didn't. All of the tasks are running according to the flow schematic I shared. The
    parse_event_responses()
    task is finishing successfully and not logging. The
    upload_df_to_s3()
    task is failing due to a permissions issue and not logging either. We are pretty puzzled too as the flow should have the appropriate permissions, but we are having trouble debugging without logs. That's why I was wondering if something strange was happening.
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah maybe you can try adding DEBUG level logs so we can try to get more info?
    Carter Kwon

    Carter Kwon

    1 year ago
    Do you have a code snippet handy of where I'd want to set the debug level? Would I set it in the flow block or per task? I'm looking at https://docs.prefect.io/core/concepts/logging.html#logging-configuration
    Kevin Kho

    Kevin Kho

    1 year ago
    Two ways, one is you can hit the
    Run
    button in the UI and then scroll down to Advanced Configuration, and then select LOG_LEVEL = DEBUG there. Two is through the Run Config where you can do
    flow.run_config = ECSRun(env={'PREFECT__LOGGING__LEVEL': "DEBUG"})
    Carter Kwon

    Carter Kwon

    1 year ago
    Thanks for the info. Does anything stick out to you? The INFO level
    This is the get_event_responses task
    is the last task that has always been logging. The final two seem to run, but not log. I should also add that the logs from every task print as expected when running locally.
    Kevin Kho

    Kevin Kho

    1 year ago
    Nothing stands out to me. Are you doing something that accidentally overwrites the logger or something?
    Carter Kwon

    Carter Kwon

    1 year ago
    I don't believe so. Here's the full code of the two tasks that aren't writing while running in ECS. It's strange that it works locally
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def parse_event_responses(event_response_list):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("This is the parse_event_responses task")
    
        df_list = []
        for res in event_response_list:
            for event in res["data"]:
                print("Parsing...." + str(event['event/type']))
                df_list.append(event)
    
        df = pd.json_normalize(df_list)
        df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
                      "receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
        print(df.shape)
        return df
    
    
    # Upload DataFrame to s3
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def upload_df_to_s3(bucket, key, date, df):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("This is the upload_df_to_s3 task")
        print("Uploading to S3")
    
        s3 = boto3.resource('s3')
        key = key.format(date=date)
        data = bytes(df.to_json(orient="records", lines=True), 'utf-8')
        compressed_data = gzip.compress(data)
    
        try:
            stream = io.BytesIO(compressed_data)
        except TypeError:
            stream = io.BytesIO(compressed_data.encode())
    
        object = s3.Object(bucket, key)
        object.put(Body=stream, ACL="bucket-owner-full-control")
        print(f"[SUCCESS] Wrote {key} to {bucket}.")
    Is it normal that the flow would end on a
    Checking flow run state...
    or should there be DEBUG logs that come after that?
    Kevin Kho

    Kevin Kho

    1 year ago
    Just tested and it’s not normal. This is a flow that intentionally fails
    Carter Kwon

    Carter Kwon

    1 year ago
    Strange. I'm not sure how to troubleshoot it further.
    Kevin Kho

    Kevin Kho

    1 year ago
    Something is killing your logger. If I have to guess, maybe some import that replaces it? Assigned a variable with the same name? The part where you last have logs would be the clue.
    Carter Kwon

    Carter Kwon

    1 year ago
    This is the last task that logs
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def get_event_responses(start_time, end_time, api_key):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("This is the get_event_responses task")
    
        event_response_list = []
        offset = 0
    
        while True:
            headers = {"apiKey": api_key}
            response = requests.get(
                "https://<removed>?start={0}&end={1}&offset={2}&limit=250".format(start_time, end_time, offset), headers=headers)
            data = json.loads(response.text)
            event_response_list.append(data)
    
            if len(data["data"]) == 250:
                offset += 250
            else:
                break
    
        return event_response_list
    I don't see anything weird in there. Plus, if something was killing the logger, wouldn't that happen locally as well?
    Kevin Kho

    Kevin Kho

    1 year ago
    Me neither. How about the next task? That would be my guess yeah
    Carter Kwon

    Carter Kwon

    1 year ago
    The next task is the
    parse_event_responses()
    one shared above (copied again below). This does not print logs when running in ECS (but does locally).
    @task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
    def parse_event_responses(event_response_list):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("This is the parse_event_responses task")
    
        df_list = []
        for res in event_response_list:
            for event in res["data"]:
                print("Parsing...." + str(event['event/type']))
                df_list.append(event)
    
        df = pd.json_normalize(df_list)
        df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
                      "receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
        print(df.shape)
        return df
    Kevin Kho

    Kevin Kho

    1 year ago
    I’ll try to replicate with prints and log_stdout=True in various combinations