Thread
#prefect-community
    c

    Chandra Manginipalli

    2 years ago
    Hi, we have a couple requirements we are evaluating Prefect for, since airflow seems a little stringent. Anyone happens to know the answers to these? I cant find answers online.1. If I am running the same flow multiple times in parallel, one for each file, can I pause/resume one of the flow runs (or tasks under it) without impacting the other flow runs for the same flow? 2. if I batch the rows in a file, can I run the same task in iteration for each batch in the file in a sequence? (I saw LOOP signal online but dont need recursion) and pausing this task will pause the subsequent iterations? 3. Is there a REST API or CLI that we could issue a flow run or pause/resume from? airflow has REST API and CLI that allows us to pause a dag or run tasks. Wondering if something similar exists with prefect.
    nicholas

    nicholas

    2 years ago
    Hi @Chandra Manginipalli! 1. Yes, that's 100% supported. 2. Operations batching is definitely supported, along with pausing mapped tasks but I'm not sure what you mean by pausing subsequent iterations: would you want other tasks in the mapped set to be paused or would you want their downstream tasks to pause execution instead? 3. Yes! We have a GraphQL API that you can use to approve paused tasks as well as a Python Client that can fully interact with the API, meaning you can use either Python or something like cURL!
    Kyle Moon-Wright

    Kyle Moon-Wright

    2 years ago
    Thanks @nicholas, some things I'll add:1. When you kick off a flow, you're actually creating and starting a FlowRun (an instance of a Flow) - so these instances will be independently of one another and you can pause/cancel/start as you need. 2. Hmm yes but I think rather than using LOOP signals, it would be better to do some task mapping to work on your batches. This way, you can use your task over a bunch of items and Prefect will dynamically create child tasks for you (you'll also be able to pause the mapping of the tasks). 3. Everything is available to you via a GraphQL API, which you can read more about here. In this case, you'd probably want to use the
    create_flow_run
    ,
    set_flow_run_states
    , and/or the
    cancel_flow_run
    mutations that are available.
    c

    Chandra Manginipalli

    2 years ago
    Thanks @nicholas and @Kyle Moon-Wright. That is exactly what I am looking for. The fact that I can pause/cancel/start individual flow runs and the GraphQL API to do these is just perfect!
    About the LOOP, to clarify, I have to process a file in batches of say 100 rows at a time. So, I was imagining I would have a task called process_batch which does that like below. So essentially task_a will break the file into batches and process_batch will process each batch. Can I not just call my task in a for loop? like below or do you still suggest task mapping? I was imagining task mapping will run them all in parallel, but instead i want each batch be processed in sequence.
    with Flow("for loop Flow") as forflow:
    for batch in batches:
    process_batch(batch)
    Kyle Moon-Wright

    Kyle Moon-Wright

    2 years ago
    I'd still move towards mapping as much as you are able, I tend to avoid changing task states unless I really need to, to avoid touching the underlying engine. I believe your flow will still work, but the error potential here is calling on the same
    process_batch
    task for each batch rather than creating a new
    process_batch
    task for each batch. So my vote is to break up the batches, then map the
    process_batch
    onto each batch dynamically (which can still iterate in sequence with the default Executor rather than parallelized).
    c

    Chandra Manginipalli

    2 years ago
    @Kyle Moon-Wright, @nicholas - How do I pause/resume the flow run from GraphQL? is it essentially set_flow_run_states mutation to Paused and then Scheduled?
    nicholas

    nicholas

    2 years ago
    Hi @Chandra Manginipalli - since flow runs never actually enter the
    Paused
    state, you only need to set the state of any
    Paused
    task runs. This gives more flexibility (and multiple manual checkpoints, if so desired). You can resume a run by setting any task run in the
    Paused
    state to a
    Resume
    state, with the following mutation:
    mutation Approve {
      set_task_run_states(input: { states: [ { task_run_id: "PAUSED_TASK_RUN_ID", state: { type: "Resume", message: "Some message you want to set that will appear in logs to let you know why/who resume this" } } ] }) {
        states { id }
      }
    }
    For more information on that, one of our team members just wrote a great blog post today about that process: https://medium.com/the-prefect-blog/needs-approval-184f2512a3cf
    c

    Chandra Manginipalli

    2 years ago
    Thanks @nicholas. Funny thing is, I actually just tried to pause flow run and it did pause the tasks under it by showing as Pending. Once I set the state back to 'Scheduled' for flow run, it started running tasks again. I didnt change the task states explicitly. Its kinda desired behavior to me so I will take it 🙂 Code used to pause flow run:-
    mutation {
    set_flow_run_states(input: {states : [{flow_run_id: "c02fe104-c7aa-4823-9134-11988926c0b1", version: 6, state: "{\"type\": \"Paused\"}"
    }]}) {
    states {
    id
    }
    }
    }
    Code used to Schedule flow run back to running.
    mutation {
    set_flow_run_states(input: {states : [{flow_run_id: "c02fe104-c7aa-4823-9134-11988926c0b1", version: 6, state: "{\"type\": \"Scheduled\"}"
    }]}) {
    states {
    id
    }
    }
    }
    nicholas

    nicholas

    2 years ago
    Hm, interesting. Glad you found that it worked for you @Chandra Manginipalli! 🙂