Fina Silva-Santisteban
11/07/2023, 7:04 PMKevin Grismore
11/07/2023, 7:10 PMKevin Grismore
11/07/2023, 7:18 PMFina Silva-Santisteban
11/07/2023, 9:05 PMmany flow runs, each with a different string input
. Something that I forgot to mention is that there are other flow parameters that do not change for that list of strings. Currently the flow as it is is a unit of work. Stakeholders have a list of strings that they run the flow for, e.g. flow run for string A with additional parameters Y, Z, then flow run for string B with the same parameters Y, Z then flow run for string C with parameters Y, Z, etc. Each unit of work, so each flow run could start at the same time or could be completed in sequence, as long as the flows run that isn't important at this point. I hope this helps clarify the use case! Please let me know what approach you'd suggest! šKevin Grismore
11/07/2023, 9:16 PMKevin Grismore
11/07/2023, 9:19 PMFina Silva-Santisteban
11/07/2023, 9:35 PMOn Monday, stakeholder Jane needs to run flow compute_things for a list of clients. That list of clients is always in the same format, let's say valid email addresses. The other parameters the flow needs are the same for all clients in that list, e.g. language and currency. If a flow run fails Jane moves on to the next client.
On Tuesday, Jane needs to run compute_things for a different list of clients. The other parameters, language and currency, may be the same ones she used on Monday or may be new ones.
Currently what Jane does is to manually start a compute_things run for each client in the list. She'd much rather provide compute_things with that list of clients and call it a day. :D
Our 'compute_things' flow is set up to run on AWS ECS. (Thanks to the help I received previously from the prefect community!)
Is there a way to implement what I need using Prefect? I'd prefer not to set up any 'layers' on top, e.g. write an api that take the parameters and iterates through them and sends flow run requests, to minimize maintenance. Looking forward to your advice! š šKevin Grismore
11/07/2023, 10:27 PMKevin Grismore
11/07/2023, 10:34 PMcompute_many_things
, that calls run_deployment
in a loop to run compute_things
on each email, which will start a separate container in ECS on every. each of the child deployment flow runs is independent from the others, and a failure of one won't stop the others from running.Fina Silva-Santisteban
11/07/2023, 11:10 PM.map()
and that worked out ok! The only annoying thing is that I have to .map()
quite a lot of times since our actual flow is more complex, which can make things harder to understand. š
How would I go about the parent deployment option?? I think Prefect used to have flow of flows, is that what you mean? I can't find anything about it anymore in the docs!Fina Silva-Santisteban
11/07/2023, 11:12 PM.map()
is Prefect's version of a map/reduce, so each thread runs independently so failure wouldn't affect any of the others. Did I get that right?Kevin Grismore
11/07/2023, 11:30 PMfrom prefect import flow, task
@flow(log_prints=True)
def compute_many_things(emails: list[str], language: str, currency: str):
compute_things.map(emails, language, currency)
@task
def compute_things(email: str, language: str, currency: str):
if email == "fail@this.email":
raise ValueError("planned failure")
if __name__ == "__main__":
compute_many_things(
emails=["fail@this.email", "run@this.email"],
language="English",
currency="Dollars",
)
I get this outcome
18:21:49.861 | ERROR | Task run 'compute_things-0' - Finished in state Failed('Task run encountered an exception ValueError: planned failure')
18:21:49.907 | INFO | Task run 'compute_things-1' - Finished in state Completed()
18:21:50.050 | ERROR | Flow run 'unique-shark' - Finished in state Failed('1/2 states failed.')
So the second task still runs, even though the first failed. But you can also do iteration over tasks with native python loops if you want, though the consequences of failure stopping the whole flow will happen like I mentioned before.
The flow of flows approach might be a better fit then, using run_deployment.Kevin Grismore
11/07/2023, 11:35 PMfrom prefect import flow
from prefect.deployments.deployments import run_deployment
@flow
def compute_many_things(emails: list[str], language: str, currency: str):
for email in emails:
run_deployment(
name="compute-things/my-deployment",
parameters={"email": email, "language": language, "currency": currency},
timeout=0,
)
and running it will just kick off runs of your existing deployed flow, once for each email in the emails
list.Fina Silva-Santisteban
11/07/2023, 11:50 PM.map()
approach provides: I can have all the results in a single markdown artifact! I think that'll be most helpful for my stakeholders. Thanks so much for all of your help Kevin! I've learned a lot! š šKevin Grismore
11/07/2023, 11:53 PM