Thread
#prefect-community
    w

    William Grim

    7 months ago
    I’ve got a question. It seems when you use an idempotency key, the key sticks around for 24hrs even if the flow run finished, meaning you can’t run the flow with the same key again. The problem with this is that it would be nice to be able to say “while a flow is running with this key, another one cannot, but when the flow finishes, the key becomes available again.” Is there a way to do this? Is there some sort of graphql or anything else that can be executed to free up an idempotency key as soon as a flow finishes?
    Kevin Kho

    Kevin Kho

    7 months ago
    Is this different from Flow run concurrency limits?
    w

    William Grim

    7 months ago
    Yeah. I don’t want more than one “job” running at a time. When the flow finishes, I want the key to be “freed” so it can be run again on-demand.
    Right now, idempotency keys have a 24 hour timeout, per the docs, and based on what I’m seeing in local testing. The only time they’re freed earlier is if the flow is re-registered, in which case I think that’s just a “virtual free”, because I think the pair is really “flow_id, idempotency_key”
    If I’m misunderstanding concurrency limits, okay, but I think you have to know the labels ahead of time and have agents setup and ready. I really want to use the same label so prefect can load balance (once we finally get kubernetes agents going), except I don’t want to allow more than one idempotency_key at a time and then freeing it up. But if there’s a way to do it with concurrency limits and generated labels or something, I’m all ears. Open to learning something new. 🙂
    Kevin Kho

    Kevin Kho

    7 months ago
    So the first thing I should mention is that the idempotency key does not expire and the docs is outdated. That is permanent so my gut tells me that is not where we should be looking
    So I guess using a concurrency limit of 1 is the closest thing. You can add an extra label to this Flow and add it on the agent since the agent labels just need to a superset of flow labels
    w

    William Grim

    7 months ago
    @Kevin Kho but I would need to be able to generate the labels programatically. is there any way to say “this well-known computed label can only have concurrency limit 1?”
    without having to know all the labels in advance?
    Kevin Kho

    Kevin Kho

    7 months ago
    How would you pass in the label? Fetch at runtime from somewhere?
    Because the only way I can think of accomplishing this is persisting some kind of global state somewhere like the KV Store. Before the Flow begins, you can look up the state and if the state shows something else is running, just look up again after certain period of time.
    Ok I think where my current thinking is if you don’t want the 2nd Flow to even start, the mechanism has to open on the orchestration level and that has to be Flow Run Concurrency Limits using labels or a rearchitecting of the problem. If you can have the second Flow run and just pause indefinitely, then it can be done in the state handler and you can do it more dynamically
    w

    William Grim

    7 months ago
    @Kevin Kho yeah, if we have to do something custom, it’s going to be checking flow run states that have particular patterns in the run_names. To your earlier question, we would use
    client.create_flow_run(…, run_config=RunConfig(labels=[my_fancy_label], …)
    if we are able to do it that way.
    Kevin Kho

    Kevin Kho

    7 months ago
    Yeah I don’t think that’ll work right? Cuz even if you had a dynamic label on the Flow side, we don’t get this luxury on the agent side right?
    w

    William Grim

    7 months ago
    Correct, and that’s a problem for this use-case. It would be nice if we could have some sort of “mutex_key” or something that lets the flow run just once while the key is active and then can be run again when the flow that was using the key goes away.
    Kevin Kho

    Kevin Kho

    7 months ago
    Would you be able to give me more details around this use case so I can think about it clearer?
    Though I wouldn’t be surprised if we really did indeed hit the limitations
    w

    William Grim

    7 months ago
    Yeah, let me type it out. If you see it saying I’m typing, it’s just that I’m trying to be thoughtful.
    We have a use-case where we want to run a flow that takes a very long time to run. Given a set of parameters to the flow, we only want the running instance to run for those params. However, if a different set of params are given and with a different “mutex_key”, we would like that one to be able to run at the same time as the first one. Think of it like this: we want to run this flow for different customers against their data. It’s okay that it takes a long time, and it’s okay to run against each customer individually. However, we don’t want to allow more than one flow at a time to hit each customer, because it would compound and eventually cause a major outage on a regular basis, because we are talking about big database queries and things like that underneath it all. So, this use-case comes in very handy when you want to serialize “runs with the same parameters”. Please let me know if you need more clarification.
    Kevin Kho

    Kevin Kho

    7 months ago
    I think I understand it. Let me digest lol. I may respond in a bit
    Yes the only thing that can help you right now is using state handlers and checking for these slots that you create yourself. It would be good to manage them with the KV Store , but I’ll bring this up to the team to talk about
    Anna Geller

    Anna Geller

    7 months ago
    @William Grim there is a potentially easier way to ensure that there is only one active flow run of a specific flow by using a state handler as shown here: https://gist.github.com/anna-geller/8cf4b56a22e3944864dd1720673ce5c8 - it queries for active flow runs before it starts doing anything and if there is any active flow run, it marks the run as Skipped. It's neither idempotency key nor concurrency limit, but seems to address the problem
    w

    William Grim

    7 months ago
    @Anna Geller that looks great! I really really appreciate the code snippet. I’m going to use that and add
    name
    to the result to help filter on that too.
    @Anna Geller overall, your approach works pretty nicely. I really appreciate it. I’m having one issue though, and that’s making sure that if, let’s say, 5 of the same flow+run_name are scheduled at the same time, this handler won’t notice, because it limits its checks to checking for
    Running
    . Do you know if there’s any way to “serialize” those checks so that at most one check for this flow is happening at a time across the agents?
    Hmm, I suppose if multiple flows do happen to come alive, we can just do an additional check and see that the original “message” to run was already captured. That’s pretty easy. Not perfect, but doable. Though, if you do have ideas for my question about serializing the state checker, I’d love some feedback. 😄
    Anna Geller

    Anna Geller

    7 months ago
    Gotcha. I believe the cleanest solution here would be to use concurrency limits to ensure that you will never get 5 scheduled flow runs of this flow if you allow at most, say 2 concurrent runs of it, as it would queue the flow runs. An alternative would be to assign yet another state handler to the same flow (a single flow may have several state handlers!) that performs a similar query but it queries for Scheduled flow runs of that flow and cancels those e.g. using:
    client.cancel_flow_run(flow_run_id="uuid from the query")
    Perhaps that's what you meant by the additional check 🙂
    w

    William Grim

    7 months ago
    @Anna Geller I think I came up with a way to guarantee what I want. I just discovered that the
    State
    object can give “parents/children”, and if I combine
    old_state
    and
    new_state
    stuff together using
    names_only=True
    , then I can build something by limiting the only valid run to whatever the lowest
    id
    is from
    flow_run
    . At least… I’m hoping
    flow_run.id
    is the same as
    prefect.context.flow_run_id
    , ha.
    I’m gonna note down what you said for cancelling flows too and look into that in more detail in a bit. Either way, you’ve shown me a ton about what can be done with state handlers.
    Can I contribute this state handler back to y’all if I get it doing what we want? This is valid use-case that I imagine a lot of people have in situations like ours.
    Anna Geller

    Anna Geller

    7 months ago
    This would be great! I actually just wanted to ask whether you could share your end solution 🙂 Thanks a lot!
    w

    William Grim

    7 months ago
    Yeah, I think this will be generic enough to share. I appreciate it! I’ll get in touch when I got sign-off from our company to do that. @Anna Geller