Thread
#prefect-community
    s

    Sven Teresniak

    2 years ago
    Hi there, we need some synchronization between flow runs. It is possible to use
    tag
    s for tasks,
    label
    s for flows, but not possible to use tags or labels for flow-runs, right? Now, the idea is to write a ressourcemanager as kind of Lock (reentrant or not, semaphore or whatever flavor). Then you could (hopefully) enter a RessourceManager
    with
    -block the same way you would use a
    Lock
    in Python code. Something like1.
    setup()
    searches (like in "asking Prefect") for all flows-runs with a given tag (that is, a run with a flow that has this certain tag set) 2. if
    setup()
    does NOT find another flow-run with that tag, the tag is set (i don't know yet where and how) 3. if
    setup()
    DOES find find another matching flow-run (or RM): then do some polling or return with successful error (configuration) 4. then all the tasks in the RM's
    with
    environment run (this is the part that should be synchronized/locked) 5. on exit the RMs
    cleanup()
    is called to remove the tag/label/whatever from the current flow (to allow other flows to set the tag) Do you think this is a good idea? Which/where is the best way to store that tag? Using the Context, Parameters, Tags, Labels? If you think this could be a nice feature we could create a prototype and pull request. We need to prevent a race condition here and for that we need to make the execution of several task isolated (the I in ACID in RDBS)
    m

    Matt Wong-Kemp

    2 years ago
    Could you use idempotency keys on flow runs for this?
    idempotency_key (str, optional): an idempotency key; if provided, this run will be cached for 24 hours. Any subsequent attempts to create a run with the same idempotency key will return the ID of the originally created run (no new run will be created after the first). An error will be raised if parameters or context are provided and don't match the original. Each subsequent request will reset the TTL for 24 hours.
    (from https://docs.prefect.io/api/latest/client/client.html#client-3)
    s

    Sven Teresniak

    2 years ago
    no, this is completely orthogonal. the idempoteny key guarantees, that you do not RESTART a certain task
    but this is a complete different situation. synchronizing concurrency is just avoiding race conditions
    e.g. a simple flow:1. check existence of file X 2. if not present: generate some data 3. save data to X
    now you have 2 flows runs. A and B1. A checks for file X 2. B checks for file X 3. A writes data to X 4. B writes data to X 5. …
    you know what i mean
    it is NOT about writing a file. its about the race condition
    with a lock you would have this:1. A tries to acquire lock -- success 2. B tries to acquire lock -- fail (polls for the lock or successfully fails) 3. A checks for file X (B still waits for the lock) 4. A writes to X (B still polling) 5. A releases the lock 6. A exits. B acquire lock 7. B checks for file X 8. X exists -> B exits
    again, its not about writing files.
    nicholas

    nicholas

    2 years ago
    Hi @Sven Teresniak - I don't think we have anything like this at the moment; in a trivial example it sounds like you'd achieve this best by combining tasks whose execution need be strictly governed that you'd otherwise put into two flows or runs into different tasks in the same flow, which would let you take advantage of data/non-data dependencies and maintain a strict parallelism through task tag concurrency.
    s

    Sven Teresniak

    2 years ago
    @nicholas okay. First: By combining several task into ONE task to get the I from ACID (isolation), we would completely loose parallelism and the power of intermediate results and retry (e.g. when the first step in this kind of combined task is an expensive job and later the task fails). Second: The tag is only available during runtime. Is it possible to generate task tags from Prefect Parameters?
    And just out of curiosity: Is it a bad idea or bad design to re-use a ResourceManager as a Lock?
    nicholas

    nicholas

    2 years ago
    Sorry @Sven Teresniak - I meant to combine tasks from disparate flows into a single flow, not a single task; combining into a single flow will give you all the benefits you mentioned. Yes you can add tags to parameters, which is covered here in the docs.
    I'd say using a resource manager as a lock in general might lead to unexpected results but it really depends on the use case.
    Since it seems like this might be a larger discussion (and something you may want to PR), I'd encourage you to move these thoughts to a GitHub discussion thread on the Core repo.
    s

    Sven Teresniak

    2 years ago
    The idea is to let the
    setup()
    method block until the current flow-run cannot find any other flow-run in the system that matches a certain tag (which is only available during runtime and based on scheduled time).
    The question is: how difficult is it to query other flow runs for their context and/or tags
    My expectation is that some graphql magic could do the job.
    nicholas

    nicholas

    2 years ago
    You can definitely query for flow runs with GraphQL and you can tailor the query to look for a certain context. Core comes with a built-in GraphQL client that should make this pretty straightforward, you'd use a query something like this:
    query {
      flow_run(where: { context: { _eq: # some json comparison } }) {
        id
        # other fields
      }
    }
    You can use the InteractiveAPI to help put that query together.
    You can make that query from within a task, which you can use as the decision point you've described
    s

    Sven Teresniak

    2 years ago
    perfect! 😄
    this is not the same as locking but it would do the job 🙂
    nicholas

    nicholas

    2 years ago
    Great!
    s

    Sven Teresniak

    2 years ago
    (for server mode)
    nicholas

    nicholas

    2 years ago
    That should be available soon.
    s

    Sven Teresniak

    2 years ago
    Damn, just one problem: We have to limit the parallel execution of a certain flow for a certain parameter/schedule time
    A
    label
    fix from the time you register a flow, right?
    nicholas

    nicholas

    2 years ago
    That's correct, but that's where you could use schedule parameters/context, I think
    s

    Sven Teresniak

    2 years ago
    As discussed above?
    I try to implement this as a prototype or proof-of-concept. I think its not that hard. Then I will create an enhancement PR to discuss this with other maintainers.
    (I dont like to maintain a fork)
    okay?
    nicholas

    nicholas

    2 years ago
    Well a combination of the above (which is highly-customizable) and a complex schedule sounds like it should accomplish what you're looking for.
    That sounds great!
    s

    Sven Teresniak

    2 years ago
    👍