Hi all! We'd like to use prefect to handle some wo...
# ask-community
n
Hi all! We'd like to use prefect to handle some workflows in our webapp which gets deployed in kubernetes. For reliability, our webapp uses the k8s
replica
setting. In addition, the app is behind guvicorn. All this means that we have multiple of our backends running concurrently. As such, if we register a flow in each backends prefect will receive multiple register request for the same flow, which is obviously wrong... How should I handle this use case? How can I have multiple instances of my application that use the same flow? We though of performing a search for the flow and creating it if not present. But then the flow's name (or anything that is used to perform the search) would become the unique key to identify a flow. From what I'm reading about prefect a flow name is not the unique key to identify a flow. Any idea? Thanks!!
d
Hi @Nicolas Bigaouette! I’d suggest having an agent in each backend with a different label and then using Flow Affinity to send Flow Runs to each in turn.
I’m looking for the right doc in addition one moment
For each Clock in a schedule, you can pass labels to the flow run
So you can have one flow that runs in all of your environments
You can also set default parameters on each schedule for additional configuration
n
Hi Dylan! (Sorry for late response, meeting! 😄 ) I am not sure I understand what you are proposing. I don't expect to have multiple agents; I have multiple instances of my backend application. When those 4 instances starts "something" needs to register the flow. Should only one instance call
flow.register()
or should all four instances call it? If they all call
register()
(with the exact same code), will prefect think that multiple versions of the flow exists or will the four instances be able to run the same exact flow?
j
Would this work? https://docs.prefect.io/orchestration/concepts/flow_runs.html#idempotent-run-creation You can pass an
idempotency_key
when you trigger a Flow run, and any other runs triggered over the next 24 hours with that same key will be disregarded
n
Hum... I'm not looking at the graphql API for now, I guess the
prefect
package API provides the same options? In any case, that is for running a flow. If I understand correctly, I could ask prefect to run a specific flow from all the instances (passing the idempotency flag) of my backend and prefect will run the flow only once.
But my question is more about what happens (or how to handle) what comes before running a flow. Can my multiple backends load/share the same flow code without prefect being confused and thinking there is multiple "different" versions of the flow?
According to https://docs.prefect.io/orchestration/tutorial/first.html#register-flow-with-the-prefect-api the
idempotency_key
flag can also be used for the
flow.register()
call, including passing the code hash. So this becomes possible:
Copy code
flow.register(
   project_name="Hello, World!",
   idempotency_key=flow.serialized_hash(),
)
Which seems to be doing what I need. But, note that
flow.serialized_hash()
was introduced in prefect 0.13.14 which was released... a week ago 😄 And it seems it's buggy too: https://github.com/PrefectHQ/prefect/issues/3653 (simple fix I think that should land soon)
d
It sounds like you should use the same flow with a parameter for whichever backend the specific flow run needs to talk to
If you need to use the same logic in different places that’s the pattern I would suggest
However, it seems like your web app is calling
flow.register
? Can you give me a 10,000 foot view of your business use case or the problem you’re trying to solve? I don’t think I have enough information to give you a good answer just yet
n
Thanks Dylan! I'll try to explain my business case. I believe I am not using prefect exactly as it was designed so having a clear understanding of what I am trying to achieve can clarify things. Our webapp is in need of a workflow engine. It reacts to some events (user interaction from a frontend, data appearing somewhere, etc.) and needs to perform some complex tasks based on these events. Right now we have some kind of linear steps execution which is becoming hard to extend and maintain. Our setup is a composed of a couple of components: a react frontend, a db and a backend that runs inside k8s. We also have an on-premise agent at our clients location which polls our backend regularly for commands to execute from behind the client's firewall. For reliability and scalability, the k8s cluster runs multiple instances of our backend (there is four instances running concurrently, accessing the same db). Each backend instance exposes (the same since its the same code/container) REST API that both the agent and the frontend access. Because there is four instances of the backend running, REST requests are not guaranteed to fall on the same backend instance. To simplify the discussion here, let's assume we have a single prefect flow that needs to be executed. Since the different backend instances are actually running the same code, we want to be able to define the flow in a way that any instance can run them (for example due to reacting to a REST requests). So I need some way to let each instance "be aware" of the flow. I say "be aware" because I am not sure how to achieve this, maybe with
flow.register()
or maybe something else. If I simply let each instance
register()
the same flow, for example at application start up, prefect interprets this as four different versions of the same flow, and only the last one registered is active (if I understand correctly). This complicates things as our prefect setup (i.e. the active flows) would then depends on the number of replicas. Right now what we are trying to do is to have a python module (say
flowmodule
) with the
flow
defined in it (using a context manager
with Flow() as flow
). Importing this module will define the flow (that might be wrong, maybe we should have some kind of setup function that builds the flows and return them?) Then somewhere else in the code (for example in a REST route) we need to execute the flow based on data. To achieve this I created a
prefect.Client()
pointing to the prefect server, then call
flowid = flowmodule.flow.register(idempotency_key=flow_hash)
. Using a
flow_hash
that is shared by all instances prevents multiple versions being registered to prefect. Then to run the flow we call
client.create_flow_run(flow_id=flow_id)
. This way, any backend instance can run the same required flow when required. I hope the description makes my use case a bit more clear... Do you think the way I define, register and run the flows make sense? Thanks a lot for your input!
j
Could you make your Flow parametrized so it works for any user (assuming they provide the right data/params), and register it outside of your backend app? Part of me thinks your application should just treat Prefect + the Flow(s) as an external service that is assumed to exist. You could have a single initContainer in your
Deployment
that registers the Flow, so the actual application backend replicas need not worry about registration at all
d
Ahh @Nicolas Bigaouette that helps a ton, thanks for that explanation 💯 I definitely agree with Joseph here. Prefect Cloud could definitely be treated as an external service here. Registering a flow is how you tell Prefect Cloud about a new flow or a new version of a flow (so when the python code of the flow itself needs to change). If each instance needs to call the same flow with different Parameters, I’d tread Prefect Cloud as an external service and call
client.create_flow_run(version_group_id-"my_id")
. This way, your web app will always call the latest version of the Prefect Flow and you don’t need to change the code in your web app at all to change the flow. Check out this blog post for a similar example where a Lambda kicks off a parameterized flow run: https://medium.com/the-prefect-blog/event-driven-workflows-with-aws-lambda-2ef9d8cc8f1a