Hi there, we need some synchronization between flo...
# prefect-community
s
Hi there, we need some synchronization between flow runs. It is possible to use
tag
s for tasks,
label
s for flows, but not possible to use tags or labels for flow-runs, right? Now, the idea is to write a ressourcemanager as kind of Lock (reentrant or not, semaphore or whatever flavor). Then you could (hopefully) enter a RessourceManager
with
-block the same way you would use a
Lock
in Python code. Something like 1.
setup()
searches (like in "asking Prefect") for all flows-runs with a given tag (that is, a run with a flow that has this certain tag set) 2. if
setup()
does NOT find another flow-run with that tag, the tag is set (i don't know yet where and how) 3. if
setup()
DOES find find another matching flow-run (or RM): then do some polling or return with successful error (configuration) 4. then all the tasks in the RM's
with
environment run (this is the part that should be synchronized/locked) 5. on exit the RMs
cleanup()
is called to remove the tag/label/whatever from the current flow (to allow other flows to set the tag) Do you think this is a good idea? Which/where is the best way to store that tag? Using the Context, Parameters, Tags, Labels? If you think this could be a nice feature we could create a prototype and pull request. We need to prevent a race condition here and for that we need to make the execution of several task isolated (the I in ACID in RDBS)
m
Could you use idempotency keys on flow runs for this?
Copy code
idempotency_key (str, optional): an idempotency key; if provided, this run will be cached for 24 hours. Any subsequent attempts to create a run with the same idempotency key will return the ID of the originally created run (no new run will be created after the first). An error will be raised if parameters or context are provided and don't match the original. Each subsequent request will reset the TTL for 24 hours.
(from https://docs.prefect.io/api/latest/client/client.html#client-3)
s
no, this is completely orthogonal. the idempoteny key guarantees, that you do not RESTART a certain task
but this is a complete different situation. synchronizing concurrency is just avoiding race conditions
e.g. a simple flow: 1. check existence of file X 2. if not present: generate some data 3. save data to X
now you have 2 flows runs. A and B 1. A checks for file X 2. B checks for file X 3. A writes data to X 4. B writes data to X 5. …
you know what i mean
it is NOT about writing a file. its about the race condition
with a lock you would have this: 1. A tries to acquire lock -- success 2. B tries to acquire lock -- fail (polls for the lock or successfully fails) 3. A checks for file X (B still waits for the lock) 4. A writes to X (B still polling) 5. A releases the lock 6. A exits. B acquire lock 7. B checks for file X 8. X exists -> B exits
again, its not about writing files.
n
Hi @Sven Teresniak - I don't think we have anything like this at the moment; in a trivial example it sounds like you'd achieve this best by combining tasks whose execution need be strictly governed that you'd otherwise put into two flows or runs into different tasks in the same flow, which would let you take advantage of data/non-data dependencies and maintain a strict parallelism through task tag concurrency.
s
@nicholas okay. First: By combining several task into ONE task to get the I from ACID (isolation), we would completely loose parallelism and the power of intermediate results and retry (e.g. when the first step in this kind of combined task is an expensive job and later the task fails). Second: The tag is only available during runtime. Is it possible to generate task tags from Prefect Parameters?
And just out of curiosity: Is it a bad idea or bad design to re-use a ResourceManager as a Lock?
n
Sorry @Sven Teresniak - I meant to combine tasks from disparate flows into a single flow, not a single task; combining into a single flow will give you all the benefits you mentioned. Yes you can add tags to parameters, which is covered here in the docs.
I'd say using a resource manager as a lock in general might lead to unexpected results but it really depends on the use case.
Since it seems like this might be a larger discussion (and something you may want to PR), I'd encourage you to move these thoughts to a GitHub discussion thread on the Core repo.
s
The idea is to let the
setup()
method block until the current flow-run cannot find any other flow-run in the system that matches a certain tag (which is only available during runtime and based on scheduled time).
The question is: how difficult is it to query other flow runs for their context and/or tags
My expectation is that some graphql magic could do the job.
n
You can definitely query for flow runs with GraphQL and you can tailor the query to look for a certain context. Core comes with a built-in GraphQL client that should make this pretty straightforward, you'd use a query something like this:
Copy code
query {
  flow_run(where: { context: { _eq: # some json comparison } }) {
    id
    # other fields
  }
}
You can use the InteractiveAPI to help put that query together.
You can make that query from within a task, which you can use as the decision point you've described
s
perfect! 😄
this is not the same as locking but it would do the job 🙂
n
Great!
s
(for server mode)
n
That should be available soon.
s
Damn, just one problem: We have to limit the parallel execution of a certain flow for a certain parameter/schedule time
A
label
fix from the time you register a flow, right?
n
That's correct, but that's where you could use schedule parameters/context, I think
s
As discussed above?
I try to implement this as a prototype or proof-of-concept. I think its not that hard. Then I will create an enhancement PR to discuss this with other maintainers.
(I dont like to maintain a fork)
okay?
n
Well a combination of the above (which is highly-customizable) and a complex schedule sounds like it should accomplish what you're looking for.
That sounds great!
s
👍