https://prefect.io logo
Title
j

Joshua Greenhalgh

12/30/2022, 9:59 AM
Hi wonder if anyone can help - I have a flow (prefect v1 on cloud) that seems to have started, run some of its tasks and then started again (logs in the thread) - when it starts again a task input seems to be
None
even though the previous task producing that input as it's output (which ran before the flow seems to have started again) ran successfully - I wonder if I am running into some problems with checkpointing and not using Results? I would really like the flow to just fail rather than trying to restart itself?
21:02:43
INFO
CloudFlowRunner
Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow'
21:02:44
INFO
CloudTaskRunner
Task 'start_time_param': Starting task run...
21:02:44
INFO
CloudTaskRunner
Task 'start_time_param': Finished task run for task with final state: 'Success'
21:02:44
INFO
CloudTaskRunner
Task 'is_scheduled_param': Starting task run...
21:02:49
INFO
CloudTaskRunner
Task 'is_scheduled_param': Finished task run for task with final state: 'Success'
21:02:53
INFO
CloudTaskRunner
Task 'end_time_param': Starting task run...
21:02:53
INFO
CloudTaskRunner
Task 'end_time_param': Finished task run for task with final state: 'Success'
21:02:53
INFO
CloudTaskRunner
Task 'get_ingestion_period': Starting task run...
21:02:53
INFO
get_ingestion_period
Ingestion period: start=2022-12-29T19:00:00+00:00, end=2022-12-29T20:00:00+00:00
21:02:53
INFO
CloudTaskRunner
Task 'get_ingestion_period': Finished task run for task with final state: 'Success'
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[0]': Starting task run...
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[0]': Finished task run for task with final state: 'Success'
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[1]': Starting task run...
21:02:54
INFO
CloudTaskRunner
Task 'get_ingestion_period[1]': Finished task run for task with final state: 'Success'
21:02:55
INFO
CloudTaskRunner
Task 'load_logs': Starting task run...
21:03:04
INFO
load_logs
count_query_result=[[{'field': 'count()', 'value': '517485'}]]
21:03:43
INFO
CloudTaskRunner
Task 'load_logs': Finished task run for task with final state: 'Success'
21:03:43
INFO
CloudTaskRunner
Task 'fname_suffix': Starting task run...
21:03:47
INFO
CloudFlowRunner
Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow'
21:03:47
INFO
CloudTaskRunner
Task 'fname_suffix': Starting task run...
21:03:48
INFO
CloudTaskRunner
Task 'fname_suffix': Finished task run for task with final state: 'Success'
21:03:48
INFO
CloudTaskRunner
Task 'store_logs': Starting task run...
Important things to note; 21:02:43 INFO CloudFlowRunner Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow' followed by 21:03:47 INFO CloudFlowRunner Beginning Flow run for 'cloudwatch_logs_ingestion_page_flow'
And then the exception I get is the following;
Task 'store_logs': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.9/site-packages/flows/cloudwatch_logs_ingestion_flow/tasks.py", line 96, in store_logs
    upload_gziped_json_lines(
  File "/usr/local/lib/python3.9/site-packages/flows/utils.py", line 220, in upload_gziped_json_lines
    writer.write_all(objects)
  File "/usr/local/lib/python3.9/site-packages/jsonlines/jsonlines.py", line 545, in write_all
    return sum(self.write(obj) for obj in iterable)
TypeError: 'NoneType' object is not iterable
Where the object it is complaining is None - is the output of the task
load_logs
which seems to have succeded; 21:03:43 INFO CloudTaskRunner Task 'load_logs': Finished task run for task with final state: 'Success'
c

Christopher Boyd

12/30/2022, 1:19 PM
Hi Joshua, What version of prefect are you using? How are these flows scheduled to start? Do you have any prefect automations configured in your project?
j

Joshua Greenhalgh

12/30/2022, 3:32 PM
v1 - they run on a cron schedule - running as jobs in k8s - the pod was killed cos the cluster was shifting pods to a different node - but in that situation I would just like it to die - yeah I have some SLAs but don't think they matter for this?
I assume it tries to pull a localresult off the filesystem but its now on another node and its not there?
c

Christopher Boyd

12/30/2022, 3:42 PM
Unfortunately, I think we’ve seen a few cases like this, and there isn’t currently an easy solution for infrastructure events like this with kubernetes: https://github.com/PrefectHQ/prefect/issues/7116
if the the tasks complete, you can persist them manually somehere and reference that state / result instead of local results on the pod filesystem