https://prefect.io logo
Title
y

YSF

09/07/2022, 10:36 PM
Looking for advice for a pattern I'm trying to implement in prefect 1 that behaves like a local filesystem sensor. I don't want to rig it up to lambda/event grid for this use case. I have files arriving in a local filesystem. My thought is to just have a first task that runs every 10 minutes that checks if a new file has arrived it determines what is "new" by checking a log of processed file names that the last task in the flow updates. I can't move the files themselves to a landing/processed area because this is sitting on top of a process in place already that I can't change. My question is, how should I make it such that when the first task runs, and it sees no new files, for the remaining tasks to not execute, but the flow and tasks to all show successful?
1
I understand I need to add a state handler function that manipulates the state to be
SKIP
but I don't see any clear examples in the documentation for how to do that.
a

Angel Acosta

09/08/2022, 3:29 PM
Hi Yusuf, I'm doing something very similar and if I run my flow on a directory that has no files to process, the flow will still be successful, but it doesn't trigger any flows further down because of how I wrote the code.
@flow
def process_all(dir):

    input_directory = dir+"/input"
    output_directory= dir+"/output"
    fail_directory = dir+"/fail"
    success_directory = dir+"/success"

    for filename in os.scandir(input_directory):
        if filename.is_file():
            input_file_path =filename.path
            input_file_name = os.path.basename(input_file_path)
            try:
                process_lis_data(input_file_path,output_directory)
            except Exception as e:
                #if processing data fails, we want to fail the flow and send file to fail folder
                print(e)
                shutil.move(input_file_path,(fail_directory +"/"+input_file_name))
                raise ValueError(f"processing failed for {input_file_name}, sending to failed folder")

            #if it gets here then it was successfull and we can move it to success folder 
            shutil.move(input_file_path,(success_directory +"/"+input_file_name))
on my try block is where the subflow will begin. but as you can see, if the directory was empty it will never get to that point.
y

YSF

09/08/2022, 3:31 PM
Oh interesting, you have it separated as a subflow
And raise a skipped state signal for a first task
But I like your way, I'm going to try that
because then I can reuse that subflow elsewhere
Thank you!
👍 1
a

Angel Acosta

09/08/2022, 3:41 PM
I'm learning as well so happy to help where I can. I still have an issue I am trying to work around with the subflows. I don't want the entire flow to fail if one subflow fails.. maybe I can take a look at the state signals you mentioned. Also I added a few lines to the example incase you are interested.
🙌 1