I have run up against this pattern several times in the past couple of weeks, and haven't found a gr...
r
I have run up against this pattern several times in the past couple of weeks, and haven't found a great solution. Essentially, I want to run a flow of subflows where every subflow will run regardless of the completion/failure of previous subflow runs. Here's some example code that processes a single file in a given directory, or runs a batch flow to process all files within a given parent directory:
Copy code
from prefect import flow, task, serve, get_run_logger, allow_failure
from pathlib import Path
from my_functions import do_the_processing, different_processing


@task 
def process_file(file_path):
    '''
    Process the file
    '''
    processed_file = do_the_processing(file_path)  # Function not shown
    # more code to process file...
    return processed_file


@task
def create_output(processed_file):
    '''
    Create output from processed file
    '''
    with open('output_n.txt', 'w') as f:
        f.write(processed_file)
    return True


@flow
def something_different_flow(file_result):
    '''
    Do a different set of tasks here and save a different output
    '''
    new_result = different_processing(file_result)  # Task not shown
    # more code calling other tasks to process file...
    create_output(new_result)


@flow
def start_file_flow(this_dir):
    '''
    Process ONE file in a given directory.
    '''
    this_file = Path(this_dir) / 'file.txt'
    processed_file = process_file(this_file)
    create_output(processed_file)


@flow()
def start_batch(root_dir):
    '''
    Process ALL files within the subfolders of a given directory.
    This flow should run start_file for each file, whether or 
    not the previous start_file_flow was successful.
    '''
    for this_dir in Path(root_dir).iterdir():
        file_result = start_file_flow(this_dir)
        # Check result and decide if something else should be done
        if file_result.is_successful():
            something_different_flow(file_result)
The problem arises when running
start_batch
on a parent folder. If processing on any single file fails for any reason, the rest of the files in the parent folder won't get processed because
start_batch
will immediately fail. I initially thought about converting
start_file_flow
to a task, but that has a couple of problems. I would like both
start_batch
and
start_file_flow
to be flows so that I can choose to run either one as a flow. I also need
start_file_flow
to be a flow so that it can run several tasks concurrently, and a prefect task can't start other tasks. The most pertinent question/answer thread I've found is here, which would work if I were using tasks and not flows. I need something like
allow_failure
but for a subflow (not a task). My ideal scenario is that I could run
start_batch
and store the state of every subflow (similar to the linked article) so that all subflows would complete. Then I could check the state/result of each subflow and decide whether to mark the parent
start_batch
flow as completed or failed at the very end. Is there any mechanism like
allow_failure
for subflows? Or is there some other programming pattern that I can employ that will allow this batch processing pattern to succeed?
1
r
Yes you can use “return_state=True” to allow failures in subflow and compute stuff on the result 🙂
upvote 1
r
Thanks for the response Robin! 🙂 That will certainly work in some scenarios. But will that still work when the
something_different_flow
runs if
start_file_flow
were to fail? As I understand it, the parent flow will fail as soon as we access the result of a failed subflow. And in this scenario we need to check the result to see if any further processing is required. So returning state will work as long as you don't need the result of a subflow. But as soon we access the result of a failed subflow, the parent flow will also fail. I can't find a way around that... ?
r
No it doesn’t work like that. I have used to it to implement what you want. Result is an object with a state, a result etc ... You can use it safely without cancelling the main flow execution 🙂
r
Awesome! Thank you 😄