Hi everyone! Best practices question: one of my fl...
# ask-community
f
Hi everyone! Best practices question: one of my flow parameters could either be a list or a string. If it's a list I'd have to iterate through it and apply the rest of the flow logic to each item individually. What's the best way to go about it?
k
If I'm understanding the question right, the comparison here is between: • many flow runs, each with a different string input • one flow run, many iterations of the logic within
To me it seems like a matter of what qualifies the flow as a unit of work, especially if you plan to deploy and schedule it. Are there time constraints? Does it matter if each iteration is completed in sequence, or should they all be started at the same time? Is it cpu-intensive or IO blocked? Does running with the full spectrum of inputs fit as part of a single conceptual whole, or should each version of a run based on its differing input need to be surfaced for visibility?
f
Hi @Kevin Grismore, thank you for your thoughtful reply! The comparison is exactly right, and in fact currently the flow is used by stakeholders as
many flow runs, each with a different string input
. Something that I forgot to mention is that there are other flow parameters that do not change for that list of strings. Currently the flow as it is is a unit of work. Stakeholders have a list of strings that they run the flow for, e.g. flow run for string A with additional parameters Y, Z, then flow run for string B with the same parameters Y, Z then flow run for string C with parameters Y, Z, etc. Each unit of work, so each flow run could start at the same time or could be completed in sequence, as long as the flows run that isn't important at this point. I hope this helps clarify the use case! Please let me know what approach you'd suggest! šŸ™
k
A few more questions: • are different inputs managed by different stakeholders? • do these inputs belong to a preexisting set, or could they be anything someone wants? • is the work considered complete when runs with all inputs are complete, or does the number of runs needed at a given time vary? • are flow runs created by a deployment? and if so, it is scheduled, run ad hoc, or triggered by some other mechanism? Even without having these answers, given that running them in sequence sounds acceptable, as long as they all need to be run, passing them to the flow as a list seems good. You could write a default value for the list in your flow function's parameters, and then combine that with all the ways Prefect can override that default via deployments, whether they're packaged with the deployment or written in the UI while setting up a custom run.
If different stakeholders have different sets of inputs that they want visibility on, it may be worth creating multiple deployments of the same flow with parameters that map to those needs.
f
@Kevin Grismore ohh interesting questions, ok let's see if this more detailed scenario explains things:
Copy code
On Monday, stakeholder Jane needs to run flow compute_things for a list of clients. That list of clients is always in the same format, let's say valid email addresses. The other parameters the flow needs are the same for all clients in that list, e.g. language and currency. If a flow run fails Jane moves on to the next client.

On Tuesday, Jane needs to run compute_things for a different list of clients. The other parameters, language and currency, may be the same ones she used on Monday or may be new ones.

Currently what Jane does is to manually start a compute_things run for each client in the list. She'd much rather provide compute_things with that list of clients and call it a day. :D
Our 'compute_things' flow is set up to run on AWS ECS. (Thanks to the help I received previously from the prefect community!) Is there a way to implement what I need using Prefect? I'd prefer not to set up any 'layers' on top, e.g. write an api that take the parameters and iterates through them and sends flow run requests, to minimize maintenance. Looking forward to your advice! šŸ‘‚ šŸ™
k
Yep, then having a list of strings as input that you iterate over makes perfect sense to me. The most important thing to look out for is managing failure, since if an earlier iteration fails, later ones won't run unless you've got exception handling or other strategies for reacting to failure.
Or you could have a parent deployment,
compute_many_things
, that calls
run_deployment
in a loop to run
compute_things
on each email, which will start a separate container in ECS on every. each of the child deployment flow runs is independent from the others, and a failure of one won't stop the others from running.
f
@Kevin Grismore I just implemented your first suggestion ('list of strings as input that you iterate over makes perfect sense to me') using
.map()
and that worked out ok! The only annoying thing is that I have to
.map()
quite a lot of times since our actual flow is more complex, which can make things harder to understand. šŸ˜… How would I go about the parent deployment option?? I think Prefect used to have flow of flows, is that what you mean? I can't find anything about it anymore in the docs!
Side question: from what I understood,
.map()
is Prefect's version of a map/reduce, so each thread runs independently so failure wouldn't affect any of the others. Did I get that right?
k
That's true! With this code
Copy code
from prefect import flow, task


@flow(log_prints=True)
def compute_many_things(emails: list[str], language: str, currency: str):
    compute_things.map(emails, language, currency)


@task
def compute_things(email: str, language: str, currency: str):
    if email == "fail@this.email":
        raise ValueError("planned failure")


if __name__ == "__main__":
    compute_many_things(
        emails=["fail@this.email", "run@this.email"],
        language="English",
        currency="Dollars",
    )
I get this outcome
Copy code
18:21:49.861 | ERROR   | Task run 'compute_things-0' - Finished in state Failed('Task run encountered an exception ValueError: planned failure')
18:21:49.907 | INFO    | Task run 'compute_things-1' - Finished in state Completed()
18:21:50.050 | ERROR   | Flow run 'unique-shark' - Finished in state Failed('1/2 states failed.')
So the second task still runs, even though the first failed. But you can also do iteration over tasks with native python loops if you want, though the consequences of failure stopping the whole flow will happen like I mentioned before. The flow of flows approach might be a better fit then, using run_deployment.
Then your parent flow might look something like this:
Copy code
from prefect import flow
from prefect.deployments.deployments import run_deployment


@flow
def compute_many_things(emails: list[str], language: str, currency: str):
    for email in emails:
        run_deployment(
            name="compute-things/my-deployment",
            parameters={"email": email, "language": language, "currency": currency},
            timeout=0,
        )
and running it will just kick off runs of your existing deployed flow, once for each email in the
emails
list.
🤩 1
f
@Kevin Grismore Very interesting!! So the flow of flows approach is still around, it's just not highlighted and mentioned by that name anymore in the docs it seems. I thought of one advantage that the
.map()
approach provides: I can have all the results in a single markdown artifact! I think that'll be most helpful for my stakeholders. Thanks so much for all of your help Kevin! I've learned a lot! 😃 šŸ™
k
flow functions can call other flow functions in a single run as well, but then their states will be more closely tied together.
āœ… 1
šŸ’” 1