Jason Thomas
07/13/2022, 12:11 PMAwaitingRetry
- retry 1:
- flow filters the list, leaving 1 file to process
- flow iterates over the filtered list, but does not call task
- flow finishes in state Completed
I’m guessing task is not called on the retry because the result has been cached and there is no reason to try again. That’s fine. However, I expected the final state of the task to be Failed
. Is this behaving as intended?from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from app import log
class FileClass():
def __init__(self, name):
self.name: str = name
self.status:str = 'Found'
def is_processed(self):
return (self.status == 'Processed')
def __repr__(self):
return (f"FileClass(name: {self.name}, status: {self.status})")
@task
def t1(file):
try:
log(f"trying: {file.name}")
if '4' in file.name:
raise Exception(f"Invalid character: {4}")
except Exception as e:
file.status = 'Failed'
log(f'error on {file.name}: {e}')
raise e
else:
file.status = 'Processed'
log(f"success: {file.name}")
finally:
log(f"new status for {file.name}: {file.status}")
@flow(task_runner=SequentialTaskRunner,
retries=3, retry_delay_seconds=2)
def f1(files):
files_to_process = [file for file in files if file.status != 'Processed']
log(f"Processing: {files_to_process}")
for file in files_to_process:
t1(file)
log(f"Processed: {files_to_process}")
if __name__ == '__main__':
names = [
'file_0',
'file_2',
'file_4',
'file_6',
]
files = [FileClass(name) for name in names]
f1(files)
Anna Geller
07/13/2022, 12:13 PMJason Thomas
07/13/2022, 12:16 PMget_run_logger
. Don’t think it’s involved, but I’l replace it and rerun.from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
class FileClass():
def __init__(self, name):
self.name: str = name
self.status:str = 'Found'
def is_processed(self):
return (self.status == 'Processed')
def __repr__(self):
return (f"FileClass(name: {self.name}, status: {self.status})")
@task
def t1(file):
try:
get_run_logger().info(f"trying: {file.name}")
if '4' in file.name:
raise Exception(f"Invalid character: {4}")
except Exception as e:
file.status = 'Failed'
get_run_logger().info(f'error on {file.name}: {e}')
raise e
else:
file.status = 'Processed'
get_run_logger().info(f"success: {file.name}")
finally:
get_run_logger().info(f"new status for {file.name}: {file.status}")
@flow(task_runner=SequentialTaskRunner,
retries=3, retry_delay_seconds=2)
def f1(files):
files_to_process = [file for file in files if file.status != 'Processed']
get_run_logger().info(f"Processing: {files_to_process}")
for file in files_to_process:
t1(file)
get_run_logger().info(f"Processed: {files_to_process}")
# if not all(f.is_processed() for f in files_to_process):
# raise Exception(f"Not all files are complete.")
if __name__ == '__main__':
names = [
'file_0',
'file_2',
'file_4',
'file_6',
]
files = [FileClass(name) for name in names]
f1(files)
And the logs:
06:17:01.151 | INFO | prefect.flow_runs - Using task runner 'SequentialTaskRunner'
06:17:01.201 | WARNING | prefect.flow_runs - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
06:17:01.319 | INFO | prefect.flow_runs - Processing: [FileClass(name: file_0, status: Found), FileClass(name: file_2, status: Found), FileClass(name: file_4, status: Found), FileClass(name: file_6, status: Found)]
06:17:01.416 | INFO | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:01.503 | INFO | prefect.task_runs - trying: file_0
06:17:01.504 | INFO | prefect.task_runs - success: file_0
06:17:01.505 | INFO | prefect.task_runs - new status for file_0: Processed
06:17:01.584 | INFO | prefect.task_runs - Finished in state Completed()
06:17:01.652 | INFO | prefect.flow_runs - Created task run 't1-daa10d70-1' for task 't1'
06:17:01.723 | INFO | prefect.task_runs - trying: file_2
06:17:01.724 | INFO | prefect.task_runs - success: file_2
06:17:01.725 | INFO | prefect.task_runs - new status for file_2: Processed
06:17:01.796 | INFO | prefect.task_runs - Finished in state Completed()
06:17:01.863 | INFO | prefect.flow_runs - Created task run 't1-daa10d70-2' for task 't1'
06:17:01.934 | INFO | prefect.task_runs - trying: file_4
06:17:01.935 | INFO | prefect.task_runs - error on file_4: Invalid character: 4
06:17:01.936 | INFO | prefect.task_runs - new status for file_4: Failed
06:17:01.938 | ERROR | prefect.task_runs - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
result = await run_sync_in_interruptible_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 28, in t1
raise e
File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 24, in t1
raise Exception(f"Invalid character: {4}")
Exception: Invalid character: 4
06:17:02.044 | ERROR | prefect.task_runs - Finished in state Failed('Task run encountered an exception.')
06:17:02.112 | INFO | prefect.flow_runs - Created task run 't1-daa10d70-3' for task 't1'
06:17:02.186 | INFO | prefect.task_runs - trying: file_6
06:17:02.187 | INFO | prefect.task_runs - success: file_6
06:17:02.188 | INFO | prefect.task_runs - new status for file_6: Processed
06:17:02.262 | INFO | prefect.task_runs - Finished in state Completed()
06:17:02.263 | INFO | prefect.flow_runs - Processed: [FileClass(name: file_0, status: Processed), FileClass(name: file_2, status: Processed), FileClass(name: file_4, status: Failed), FileClass(name: file_6, status: Processed)]
06:17:02.363 | INFO | prefect.flow_runs - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
06:17:03.463 | INFO | prefect.flow_runs - Processing: [FileClass(name: file_4, status: Failed)]
06:17:03.499 | INFO | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:03.573 | INFO | prefect.flow_runs - Processed: [FileClass(name: file_4, status: Failed)]
06:17:03.630 | INFO | prefect.flow_runs - Finished in state Completed('All states completed.')
Anna Geller
07/13/2022, 12:34 PMprefect config set PREFECT_LOGGING_LEVEL='DEBUG'
Jason Thomas
07/13/2022, 12:48 PMAnna Geller
07/13/2022, 12:59 PM• For each element in the list:
◦ check an attribute (is_processed) and skip if it’s already been processedI'd suggest looking it up in some other way e.g. even on a JSON block since modifying a class state this way from a flow may lead to undesired behavior if you leverage e.g. ConcurrentTaskRunner or move to dask/ray or any form of parallelism/concurrency the ideal state is when all tasks and flows are stateless and the only state is something that tasks pass between each other as data dependency or when the state is stored external to the flow run e.g. as key-value pair using JSON Block or some other store (DB, Redis, etc)
Jason Thomas
07/13/2022, 1:07 PMAnna Geller
07/13/2022, 1:08 PMupdate the db table with the new status
you kind of are doing orchestration within an orchestrator 🙂
I believe what you want to accomplish is using task-level retries with some way of knowing which files failed to get processed in order to e.g. investigate why and process those manually later on (perhaps you would check manually and discover it failed to process due to some weird delimiter or a single bad row)
Here's how I would do it - you can replace Slack with any other action based on what you want to do with failed files: https://gist.github.com/c19f05e0549556f5ec79312d6cb08da5
as a result after all 3 retries per file, eventually only one out of 19 files failed and I got a message telling me which one failed - see image
and the final flow run state is failed even though only one file failed
15:30:37.565 | ERROR | Flow run 'airborne-rooster' - Finished in state Failed('1/19 states failed.')
Jason Thomas
07/13/2022, 1:41 PMAnna Geller
07/13/2022, 6:24 PM