Joshua Greenhalgh
12/30/2022, 9:59 AMNone
even though the previous task producing that input as it's output (which ran before the flow seems to have started again) ran successfully - I wonder if I am running into some problems with checkpointing and not using Results? I would really like the flow to just fail rather than trying to restart itself?21:02:43
INFO
CloudFlowRunner
Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow'
21:02:44
INFO
CloudTaskRunner
Task 'start_time_param': Starting task run...
21:02:44
INFO
CloudTaskRunner
Task 'start_time_param': Finished task run for task with final state: 'Success'
21:02:44
INFO
CloudTaskRunner
Task 'is_scheduled_param': Starting task run...
21:02:49
INFO
CloudTaskRunner
Task 'is_scheduled_param': Finished task run for task with final state: 'Success'
21:02:53
INFO
CloudTaskRunner
Task 'end_time_param': Starting task run...
21:02:53
INFO
CloudTaskRunner
Task 'end_time_param': Finished task run for task with final state: 'Success'
21:02:53
INFO
CloudTaskRunner
Task 'get_ingestion_period': Starting task run...
21:02:53
INFO
get_ingestion_period
Ingestion period: start=2022-12-29T19:00:00+00:00, end=2022-12-29T20:00:00+00:00
21:02:53
INFO
CloudTaskRunner
Task 'get_ingestion_period': Finished task run for task with final state: 'Success'
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[0]': Starting task run...
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[0]': Finished task run for task with final state: 'Success'
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[1]': Starting task run...
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[1]': Finished task run for task with final state: 'Success'
21:02:55
INFO
CloudTaskRunner
Task 'load_logs': Starting task run...
21:03:04
INFO
load_logs
count_query_result=[[{'field': 'count()', 'value': '517485'}]]
21:03:43
INFO
CloudTaskRunner
Task 'load_logs': Finished task run for task with final state: 'Success'
21:03:43
INFO
CloudTaskRunner
Task 'fname_suffix': Starting task run...
21:03:47
INFO
CloudFlowRunner
Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow'
21:03:47
INFO
CloudTaskRunner
Task 'fname_suffix': Starting task run...
21:03:48
INFO
CloudTaskRunner
Task 'fname_suffix': Finished task run for task with final state: 'Success'
21:03:48
INFO
CloudTaskRunner
Task 'store_logs': Starting task run...
Task 'store_logs': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.9/site-packages/flows/cloudwatch_logs_ingestion_flow/tasks.py", line 96, in store_logs
upload_gziped_json_lines(
File "/usr/local/lib/python3.9/site-packages/flows/utils.py", line 220, in upload_gziped_json_lines
writer.write_all(objects)
File "/usr/local/lib/python3.9/site-packages/jsonlines/jsonlines.py", line 545, in write_all
return sum(self.write(obj) for obj in iterable)
TypeError: 'NoneType' object is not iterable
Where the object it is complaining is None - is the output of the task load_logs
which seems to have succeded;
21:03:43
INFO
CloudTaskRunner
Task 'load_logs': Finished task run for task with final state: 'Success'Christopher Boyd
12/30/2022, 1:19 PMJoshua Greenhalgh
12/30/2022, 3:32 PMChristopher Boyd
12/30/2022, 3:42 PM