Hi all, version 2.0b8 I’m playing with flow retri...
# prefect-community
j
Hi all, version 2.0b8 I’m playing with flow retries on and got a result I didn’t expect. I’ll post code in the thread, but here’s what’s happening: - flow has one retry - I call flow, passing in a list of 4 unprocessed ‘files’ - retry 0: - flow filters the list to remove any processed files, leaving 4 files to process - flow iterates over the filtered list, calling task on each unprocessed file - task processes each file - on one file, task raises an error - flow continues to run, processing 3 out of 4 files - flow finishes in state
AwaitingRetry
- retry 1: - flow filters the list, leaving 1 file to process - flow iterates over the filtered list, but does not call task - flow finishes in state
Completed
I’m guessing task is not called on the retry because the result has been cached and there is no reason to try again. That’s fine. However, I expected the final state of the task to be
Failed
.
Is this behaving as intended?
1
Here’s the code
Copy code
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from app import log

class FileClass():

    def __init__(self, name):
        self.name: str = name
        self.status:str = 'Found'

    def is_processed(self):
        return (self.status == 'Processed')

    def __repr__(self):
        return (f"FileClass(name: {self.name}, status: {self.status})")

@task 
def t1(file):
    try:
        log(f"trying: {file.name}")
        if '4' in file.name:
            raise Exception(f"Invalid character: {4}")
    except Exception as e:
        file.status = 'Failed'
        log(f'error on {file.name}: {e}')
        raise e
    else:
        file.status = 'Processed'
        log(f"success: {file.name}")
    finally:
        log(f"new status for {file.name}: {file.status}")


@flow(task_runner=SequentialTaskRunner, 
      retries=3, retry_delay_seconds=2) 
def f1(files):
    files_to_process = [file for file in files if file.status != 'Processed']
    log(f"Processing: {files_to_process}")
    for file in files_to_process:
        t1(file)
    log(f"Processed: {files_to_process}")


if __name__ == '__main__':
    names = [
        'file_0',
        'file_2',
        'file_4',
        'file_6',
    ]
    files = [FileClass(name) for name in names]
    f1(files)
a
what is this line? from app import log could you try using Prefect logger instead?
j
It’s just a wrapper for
get_run_logger
. Don’t think it’s involved, but I’l replace it and rerun.
👍 1
No change. Here’s the code:
Copy code
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner

class FileClass():

    def __init__(self, name):
        self.name: str = name
        self.status:str = 'Found'

    def is_processed(self):
        return (self.status == 'Processed')

    def __repr__(self):
        return (f"FileClass(name: {self.name}, status: {self.status})")

@task 
def t1(file):
    try:
        get_run_logger().info(f"trying: {file.name}")
        if '4' in file.name:
            raise Exception(f"Invalid character: {4}")
    except Exception as e:
        file.status = 'Failed'
        get_run_logger().info(f'error on {file.name}: {e}')
        raise e
    else:
        file.status = 'Processed'
        get_run_logger().info(f"success: {file.name}")
    finally:
        get_run_logger().info(f"new status for {file.name}: {file.status}")


@flow(task_runner=SequentialTaskRunner, 
      retries=3, retry_delay_seconds=2) 
def f1(files):
    files_to_process = [file for file in files if file.status != 'Processed']
    get_run_logger().info(f"Processing: {files_to_process}")
    for file in files_to_process:
        t1(file)
    get_run_logger().info(f"Processed: {files_to_process}")

    # if not all(f.is_processed() for f in files_to_process):
    #     raise Exception(f"Not all files are complete.")


if __name__ == '__main__':
    names = [
        'file_0',
        'file_2',
        'file_4',
        'file_6',
    ]
    files = [FileClass(name) for name in names]
    f1(files)
And the logs:
Copy code
06:17:01.151 | INFO    | prefect.flow_runs - Using task runner 'SequentialTaskRunner'
06:17:01.201 | WARNING | prefect.flow_runs - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
06:17:01.319 | INFO    | prefect.flow_runs - Processing: [FileClass(name: file_0, status: Found), FileClass(name: file_2, status: Found), FileClass(name: file_4, status: Found), FileClass(name: file_6, status: Found)]
06:17:01.416 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:01.503 | INFO    | prefect.task_runs - trying: file_0
06:17:01.504 | INFO    | prefect.task_runs - success: file_0
06:17:01.505 | INFO    | prefect.task_runs - new status for file_0: Processed
06:17:01.584 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:01.652 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-1' for task 't1'
06:17:01.723 | INFO    | prefect.task_runs - trying: file_2
06:17:01.724 | INFO    | prefect.task_runs - success: file_2
06:17:01.725 | INFO    | prefect.task_runs - new status for file_2: Processed
06:17:01.796 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:01.863 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-2' for task 't1'
06:17:01.934 | INFO    | prefect.task_runs - trying: file_4
06:17:01.935 | INFO    | prefect.task_runs - error on file_4: Invalid character: 4
06:17:01.936 | INFO    | prefect.task_runs - new status for file_4: Failed
06:17:01.938 | ERROR   | prefect.task_runs - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/engine.py", line 890, in orchestrate_task_run
    result = await run_sync_in_interruptible_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/jason.thomas/dev/sftp_sweeper/.venv_b8/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 28, in t1
    raise e
  File "/Users/jason.thomas/dev/sftp_sweeper/_flow_retry_1.py", line 24, in t1
    raise Exception(f"Invalid character: {4}")
Exception: Invalid character: 4
06:17:02.044 | ERROR   | prefect.task_runs - Finished in state Failed('Task run encountered an exception.')
06:17:02.112 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-3' for task 't1'
06:17:02.186 | INFO    | prefect.task_runs - trying: file_6
06:17:02.187 | INFO    | prefect.task_runs - success: file_6
06:17:02.188 | INFO    | prefect.task_runs - new status for file_6: Processed
06:17:02.262 | INFO    | prefect.task_runs - Finished in state Completed()
06:17:02.263 | INFO    | prefect.flow_runs - Processed: [FileClass(name: file_0, status: Processed), FileClass(name: file_2, status: Processed), FileClass(name: file_4, status: Failed), FileClass(name: file_6, status: Processed)]
06:17:02.363 | INFO    | prefect.flow_runs - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
06:17:03.463 | INFO    | prefect.flow_runs - Processing: [FileClass(name: file_4, status: Failed)]
06:17:03.499 | INFO    | prefect.flow_runs - Created task run 't1-daa10d70-0' for task 't1'
06:17:03.573 | INFO    | prefect.flow_runs - Processed: [FileClass(name: file_4, status: Failed)]
06:17:03.630 | INFO    | prefect.flow_runs - Finished in state Completed('All states completed.')
I’m guessing the unexpected outcome is related to the caching behavior, because when I change it to eliminating the caching I get a different result. edit: Nevermind, the change I was referring to still did not result in the flow failing.
👍 1
a
I believe there are several things to consider here: • to determine the final state if a flow run, you would need to either return a state directly or return a task run result that you care about - the one that should determine whether the flow run was successful or not • you use try/except/finally which trap exceptions and make it hard to test the actual Prefect exception handling • the class FileClass (being a class!) is stateful and may cause side effects - perhaps returning the object from a function makes it easier to test out the behavior you want • you need to differentiate between flow and task level retries - the flow retry is for the final state of a flow run and I believe the entire file processing could be better handled on the task level, depending on what you try to accomplish in this example This docs page may help here https://orion-docs.prefect.io/concepts/flows/#final-state-determination
debug logs may be helpful too:
Copy code
prefect config set PREFECT_LOGGING_LEVEL='DEBUG'
j
Thanks Anna, I’ll look at the docs and consider your points. I have the debug-level logs but didn’t notice anything helpful there. I can post them if you like. Regarding the structure (try/except/finally, flow-level retries, loop in flow and call task on each element), it’s the simplest way I’ve found to accomplish the behavior I want. If there’s a better way please let me know. Here’s what I’m trying to do: • Pass in a list containing details of files I want to process. • For each element in the list: ◦ check an attribute (is_processed) and skip if it’s already been processed ◦ insert or update to a db table indicating the current status of the file ◦ try to process the file ◦ on success: ▪︎ update the attribute to indicate processed ◦ on error ◦ update the attribute to indicate failed ◦ mark the flow as failed (to trigger retry) ▪︎ but continue looping ◦ on either success or error: ▪︎ update the db table with the new status
a
• For each element in the list:
◦ check an attribute (is_processed) and skip if it’s already been processed
I'd suggest looking it up in some other way e.g. even on a JSON block since modifying a class state this way from a flow may lead to undesired behavior if you leverage e.g. ConcurrentTaskRunner or move to dask/ray or any form of parallelism/concurrency the ideal state is when all tasks and flows are stateless and the only state is something that tasks pass between each other as data dependency or when the state is stored external to the flow run e.g. as key-value pair using JSON Block or some other store (DB, Redis, etc)
in some way, your is_processed is like a Completed state in Prefect - perhaps instead of using it you can simply check if the task run's state was successful or not and continue downstream based on that Prefect state rather than this class's state
j
Thanks Anna, I will take this all into consideration. But I’m afraid I’ve glossed over the main point here. My main concern is this: • The outcome of the flow’s first attempt ended in a failed state. This triggered a retry • The outcome of the flow’s retry was the same as the first attempt, but it ended in a different state
a
🤔
so IMO with this step
Copy code
update the db table with the new status
you kind of are doing orchestration within an orchestrator 🙂 I believe what you want to accomplish is using task-level retries with some way of knowing which files failed to get processed in order to e.g. investigate why and process those manually later on (perhaps you would check manually and discover it failed to process due to some weird delimiter or a single bad row) Here's how I would do it - you can replace Slack with any other action based on what you want to do with failed files: https://gist.github.com/c19f05e0549556f5ec79312d6cb08da5 as a result after all 3 retries per file, eventually only one out of 19 files failed and I got a message telling me which one failed - see image and the final flow run state is failed even though only one file failed
Copy code
15:30:37.565 | ERROR   | Flow run 'airborne-rooster' - Finished in state Failed('1/19 states failed.')
j
I’ve simplified my example greatly, and I think I understand better now. In my original example: • On the flow’s first attempt, when the task processed the ‘fail’ file it returned an error rather than raising an error. Prefect (correctly) interpreted that as a successful task run. Then I raised the error inside the flow, so even though the task succeeded the flow failed. • On the retry, Prefect knew that the ‘fail’ file had been processed already, so even though I told it to run task again it did not. Since task was not run no error was returned, and thus no error was raised, so the flow ended in success. You were 100% correct Anna - my overly-complicated code was the problem. I didn’t appreciate how smart Orion is, and my ‘help’ was getting in the way. This is great news. I can simplify my approach. Thank you Anna, this was so helpful. I am going to go through all these resources to make sure I get it.
🙌 1
a
glad to hear I could help 👍