Hello, we are having some issues around the loggin...
# ask-community
c
Hello, we are having some issues around the logging. Some, but not all of the logs are appearing in the dashboard. Any ideas how I might troubleshoot this? We are using ECS.
k
Hi @Carter Kwon, does this happen all the times or just sometimes? Are you using Dask? Is it the worker logs not appearing but the client logs are? Any insight as to what logs are missing?
c
It happens consistently with this particular flow, but I haven't noticed it on other flows. This flow is just using the default LocalExecutor. I'm not exactly sure what you mean by worker vs. client logs, but it is some tasks print the logs and others don't. Here is the flow run schematic and some flow code for the functions with irrelevant lines removed. The
get_start_end_time
task print statements appear in logs, but the other two tasks do not have print statements in logs. Please let me know if I can provide anymore info to help.
Copy code
@task(nout=2, log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def get_start_end_time(start_date):
    <task logic>
    print(start_time)
    print(end_time)
    return start_time, end_time

@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
    <task logic>
    print(df.shape)
    return df

@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def upload_df_to_s3(bucket, key, date, df):
    # Create the s3 resource
    print("Uploading to S3")
    <task logic>
Some more background... The upload to s3 task is getting a permission denied error. We've verified that the newest code is pointing to the correct bucket and shouldn't have permission issues. We added some print statements to ensure the newest code is being run, but those print statements are not appearing. It seems like the flow might not be running the newest code. I looked at the specific flow version "DETAILS" sections in the UI and manually pulled the image it's pointing to from ECR. That image contains the newest code (print statements and correct bucket).
This is what I mean when I say the flow details
k
Gotcha. Instead of print, could you use something like:
Copy code
import prefect

@task
def cde(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the third task")
    <http://logger.info|logger.info>('in cde: ' + str(x))
    time.sleep(2)
    return x+3
so that it gets sent to the logger?
c
I can try that. Just to make sure I'm clear, the print statements combined with
log_stdout=True
should appear in the logs though - right?
k
Actually no I see what you mean. I think they should appear I think what’s happening is that ECS is pulling the wrong image. How did you tag the image?
c
We use the default prefect tagging of timestamp so it looks like it's all correct. Here's a less redacted version. It's also part of a CI/CD process with other flows that seem to be working fine so it would be a little strange that this is the only one with problems.
I added the logs you mentioned
Copy code
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the <TASK_NAME> task")
to the top of each task and pushed it. The same tasks that were logging print statements were showing the new logs, but those other tasks still aren't. This shows the most recent code is being run though since I just added those new statements.
k
Oh I was testing your setup just now but I guess you changed. Just wanna be clear, the tasks are running but and successful but you don’t get logs out of them? This is puzzling, have you tried setting the log level to debug?
c
I added those logger statements to see if those other tasks' logs would show up, but they didn't. All of the tasks are running according to the flow schematic I shared. The
parse_event_responses()
task is finishing successfully and not logging. The
upload_df_to_s3()
task is failing due to a permissions issue and not logging either. We are pretty puzzled too as the flow should have the appropriate permissions, but we are having trouble debugging without logs. That's why I was wondering if something strange was happening.
k
Yeah maybe you can try adding DEBUG level logs so we can try to get more info?
c
Do you have a code snippet handy of where I'd want to set the debug level? Would I set it in the flow block or per task? I'm looking at https://docs.prefect.io/core/concepts/logging.html#logging-configuration
k
Two ways, one is you can hit the
Run
button in the UI and then scroll down to Advanced Configuration, and then select LOG_LEVEL = DEBUG there. Two is through the Run Config where you can do
flow.run_config = ECSRun(env={'PREFECT__LOGGING__LEVEL': "DEBUG"})
👍 1
c
Thanks for the info. Does anything stick out to you? The INFO level
This is the get_event_responses task
is the last task that has always been logging. The final two seem to run, but not log. I should also add that the logs from every task print as expected when running locally.
k
Nothing stands out to me. Are you doing something that accidentally overwrites the logger or something?
c
I don't believe so. Here's the full code of the two tasks that aren't writing while running in ECS. It's strange that it works locally
Copy code
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the parse_event_responses task")

    df_list = []
    for res in event_response_list:
        for event in res["data"]:
            print("Parsing...." + str(event['event/type']))
            df_list.append(event)

    df = pd.json_normalize(df_list)
    df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
                  "receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
    print(df.shape)
    return df


# Upload DataFrame to s3
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def upload_df_to_s3(bucket, key, date, df):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the upload_df_to_s3 task")
    print("Uploading to S3")

    s3 = boto3.resource('s3')
    key = key.format(date=date)
    data = bytes(df.to_json(orient="records", lines=True), 'utf-8')
    compressed_data = gzip.compress(data)

    try:
        stream = io.BytesIO(compressed_data)
    except TypeError:
        stream = io.BytesIO(compressed_data.encode())

    object = s3.Object(bucket, key)
    object.put(Body=stream, ACL="bucket-owner-full-control")
    print(f"[SUCCESS] Wrote {key} to {bucket}.")
Is it normal that the flow would end on a
Checking flow run state...
or should there be DEBUG logs that come after that?
k
Just tested and it’s not normal. This is a flow that intentionally fails
c
Strange. I'm not sure how to troubleshoot it further.
k
Something is killing your logger. If I have to guess, maybe some import that replaces it? Assigned a variable with the same name? The part where you last have logs would be the clue.
c
This is the last task that logs
Copy code
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def get_event_responses(start_time, end_time, api_key):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the get_event_responses task")

    event_response_list = []
    offset = 0

    while True:
        headers = {"apiKey": api_key}
        response = requests.get(
            "https://<removed>?start={0}&end={1}&offset={2}&limit=250".format(start_time, end_time, offset), headers=headers)
        data = json.loads(response.text)
        event_response_list.append(data)

        if len(data["data"]) == 250:
            offset += 250
        else:
            break

    return event_response_list
I don't see anything weird in there. Plus, if something was killing the logger, wouldn't that happen locally as well?
k
Me neither. How about the next task? That would be my guess yeah
c
The next task is the
parse_event_responses()
one shared above (copied again below). This does not print logs when running in ECS (but does locally).
Copy code
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("This is the parse_event_responses task")

    df_list = []
    for res in event_response_list:
        for event in res["data"]:
            print("Parsing...." + str(event['event/type']))
            df_list.append(event)

    df = pd.json_normalize(df_list)
    df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
                  "receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
    print(df.shape)
    return df
k
I’ll try to replicate with prints and log_stdout=True in various combinations