Carter Kwon
06/22/2021, 8:29 PMKevin Kho
Carter Kwon
06/22/2021, 8:58 PMget_start_end_time
task print statements appear in logs, but the other two tasks do not have print statements in logs. Please let me know if I can provide anymore info to help.
@task(nout=2, log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def get_start_end_time(start_date):
<task logic>
print(start_time)
print(end_time)
return start_time, end_time
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
<task logic>
print(df.shape)
return df
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def upload_df_to_s3(bucket, key, date, df):
# Create the s3 resource
print("Uploading to S3")
<task logic>
Carter Kwon
06/22/2021, 9:06 PMCarter Kwon
06/22/2021, 9:08 PMKevin Kho
import prefect
@task
def cde(x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the third task")
<http://logger.info|logger.info>('in cde: ' + str(x))
time.sleep(2)
return x+3
so that it gets sent to the logger?Carter Kwon
06/22/2021, 9:11 PMlog_stdout=True
should appear in the logs though - right?Kevin Kho
Carter Kwon
06/22/2021, 9:20 PMCarter Kwon
06/22/2021, 9:24 PMlogger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the <TASK_NAME> task")
to the top of each task and pushed it. The same tasks that were logging print statements were showing the new logs, but those other tasks still aren't. This shows the most recent code is being run though since I just added those new statements.Kevin Kho
Carter Kwon
06/22/2021, 9:51 PMparse_event_responses()
task is finishing successfully and not logging. The upload_df_to_s3()
task is failing due to a permissions issue and not logging either. We are pretty puzzled too as the flow should have the appropriate permissions, but we are having trouble debugging without logs. That's why I was wondering if something strange was happening.Kevin Kho
Carter Kwon
06/22/2021, 9:55 PMKevin Kho
Run
button in the UI and then scroll down to Advanced Configuration, and then select LOG_LEVEL = DEBUG there. Two is through the Run Config where you can do flow.run_config = ECSRun(env={'PREFECT__LOGGING__LEVEL': "DEBUG"})
Carter Kwon
06/22/2021, 10:07 PMThis is the get_event_responses task
is the last task that has always been logging. The final two seem to run, but not log. I should also add that the logs from every task print as expected when running locally.Kevin Kho
Carter Kwon
06/22/2021, 10:34 PM@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the parse_event_responses task")
df_list = []
for res in event_response_list:
for event in res["data"]:
print("Parsing...." + str(event['event/type']))
df_list.append(event)
df = pd.json_normalize(df_list)
df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
"receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
print(df.shape)
return df
# Upload DataFrame to s3
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def upload_df_to_s3(bucket, key, date, df):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the upload_df_to_s3 task")
print("Uploading to S3")
s3 = boto3.resource('s3')
key = key.format(date=date)
data = bytes(df.to_json(orient="records", lines=True), 'utf-8')
compressed_data = gzip.compress(data)
try:
stream = io.BytesIO(compressed_data)
except TypeError:
stream = io.BytesIO(compressed_data.encode())
object = s3.Object(bucket, key)
object.put(Body=stream, ACL="bucket-owner-full-control")
print(f"[SUCCESS] Wrote {key} to {bucket}.")
Carter Kwon
06/22/2021, 10:40 PMChecking flow run state...
or should there be DEBUG logs that come after that?Kevin Kho
Carter Kwon
06/22/2021, 10:53 PMKevin Kho
Carter Kwon
06/22/2021, 10:58 PM@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def get_event_responses(start_time, end_time, api_key):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the get_event_responses task")
event_response_list = []
offset = 0
while True:
headers = {"apiKey": api_key}
response = requests.get(
"https://<removed>?start={0}&end={1}&offset={2}&limit=250".format(start_time, end_time, offset), headers=headers)
data = json.loads(response.text)
event_response_list.append(data)
if len(data["data"]) == 250:
offset += 250
else:
break
return event_response_list
I don't see anything weird in there. Plus, if something was killing the logger, wouldn't that happen locally as well?Kevin Kho
Carter Kwon
06/22/2021, 11:00 PMparse_event_responses()
one shared above (copied again below). This does not print logs when running in ECS (but does locally).
@task(log_stdout=True, max_retries=3, retry_delay=timedelta(seconds=5))
def parse_event_responses(event_response_list):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("This is the parse_event_responses task")
df_list = []
for res in event_response_list:
for event in res["data"]:
print("Parsing...." + str(event['event/type']))
df_list.append(event)
df = pd.json_normalize(df_list)
df.columns = ["post/body_text", "post/word_count", "system/created-at", "event_id", "event/type", "date/iso", "network_id", "network_name", "community_id", "community/title", "community/saved-lti-fields",
"receiver_id", "receiver_username", "receiver_firstname", "receiver_lastname", "receiver_email", "actor_id", "actor_username", "actor_firstname", "actor_lastname", "actor_email"]
print(df.shape)
return df
Kevin Kho