Jakub Cieslik

    Jakub Cieslik

    4 months ago
    Hi, i have a quick question about referencing a past flow run in the current-flow. Lets say i'm processing files in one-run of the flow, the next time i'm executing (the flow) it i want to only process the new filesnot yet processed, so i could process only those that were created after the last successful run (maybe add some overlap just in case) Pretty sure doing it times based is not the best solution but it would work for my case. For simplicity i don't want to keep a state of already processed files. What would be a common solution to this problem?
    Kevin Kho

    Kevin Kho

    4 months ago
    For this, you really need to hold some form of state that holds information about what has already been processed. We have the KV Store which lets you persist the last id or timestamp and then you only need to process things after that
    Jakub Cieslik

    Jakub Cieslik

    4 months ago
    Thanks for the super fast answer! The KV store is really neat and would alow doing this with very little overhead indeed, so i might go this path afterall. But in general i couldn't find in the docs how to programatically access things such as the list of past runs, list flows etc. For instance: prefect.get_flow(flow_id).list_flow_runs() Is such thing possible? I found things like FlowRunView that i can instantiate with the uuid, but what if i dont have it?
    Kevin Kho

    Kevin Kho

    4 months ago
    You need to use a GraphQL like this inside your code to fetch that
    Really the KV Store is the easier solution because it’s more robust to failure (the new value just doesnt get written so all data gets reprocessed)
    Assuming you go down the path of the GraphQL. API, you would need to code your own potential failure path to identify if the last flow failed
    Jakub Cieslik

    Jakub Cieslik

    4 months ago
    yeah totally agree, for this to go the state path afterall. Im bit new to workflow schedulers in general and was surprised that this functionality is not there, at least not in a the official API python client (which could mean it is actually not that important) does it mean its actually not that common to have scenarios that need that? For instance i would like to programatically access the outputs of different flow-runs and analyze them, compare etc. In such scenario i would need to call the graphql directly to fetch ids, output locations etc?
    Kevin Kho

    Kevin Kho

    4 months ago
    Output is not tied to a Flow, it’s tied to a task. So there is a
    get_task_run_result
    task where you specify the flow id and task you want to get the result from and then it gets loaded in. The KV Store was created for these kinds of use cases in mind. It is a common scenario to have incremental updating, but normally you don’t need the previous result loaded in. You just need the “watermark” of the last execution for the next. What do you plan to do with the previous result?