Richard Alexander
02/26/2024, 2:31 PMfrom prefect import flow, task, serve, get_run_logger, allow_failure
from pathlib import Path
from my_functions import do_the_processing, different_processing
@task
def process_file(file_path):
'''
Process the file
'''
processed_file = do_the_processing(file_path) # Function not shown
# more code to process file...
return processed_file
@task
def create_output(processed_file):
'''
Create output from processed file
'''
with open('output_n.txt', 'w') as f:
f.write(processed_file)
return True
@flow
def something_different_flow(file_result):
'''
Do a different set of tasks here and save a different output
'''
new_result = different_processing(file_result) # Task not shown
# more code calling other tasks to process file...
create_output(new_result)
@flow
def start_file_flow(this_dir):
'''
Process ONE file in a given directory.
'''
this_file = Path(this_dir) / 'file.txt'
processed_file = process_file(this_file)
create_output(processed_file)
@flow()
def start_batch(root_dir):
'''
Process ALL files within the subfolders of a given directory.
This flow should run start_file for each file, whether or
not the previous start_file_flow was successful.
'''
for this_dir in Path(root_dir).iterdir():
file_result = start_file_flow(this_dir)
# Check result and decide if something else should be done
if file_result.is_successful():
something_different_flow(file_result)
The problem arises when running start_batch
on a parent folder. If processing on any single file fails for any reason, the rest of the files in the parent folder won't get processed because start_batch
will immediately fail.
I initially thought about converting start_file_flow
to a task, but that has a couple of problems. I would like both start_batch
and start_file_flow
to be flows so that I can choose to run either one as a flow. I also need start_file_flow
to be a flow so that it can run several tasks concurrently, and a prefect task can't start other tasks.
The most pertinent question/answer thread I've found is here, which would work if I were using tasks and not flows. I need something like allow_failure
but for a subflow (not a task).
My ideal scenario is that I could run start_batch
and store the state of every subflow (similar to the linked article) so that all subflows would complete. Then I could check the state/result of each subflow and decide whether to mark the parent start_batch
flow as completed or failed at the very end.
Is there any mechanism like allow_failure
for subflows? Or is there some other programming pattern that I can employ that will allow this batch processing pattern to succeed?Robin Niel
02/26/2024, 3:04 PMRichard Alexander
02/26/2024, 3:38 PMsomething_different_flow
runs if start_file_flow
were to fail? As I understand it, the parent flow will fail as soon as we access the result of a failed subflow. And in this scenario we need to check the result to see if any further processing is required.
So returning state will work as long as you don't need the result of a subflow. But as soon we access the result of a failed subflow, the parent flow will also fail. I can't find a way around that... ?Robin Niel
02/26/2024, 3:44 PMRichard Alexander
02/26/2024, 3:47 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by