Hello Prefect community. :wave: Wanted to share a ...
# show-us-what-you-got
Hello Prefect community. 👋 Wanted to share a small code snippet I've been using to resolve this open issue: https://github.com/PrefectHQ/prefect/issues/5853 It's not technically a "fix" as I am creating new functions in the namespace via currying 🍛, but I think it's a cleaner solution compared to the current hack of using "deepcopy". @Jacob Danovitch @Jeff Carrico @Anna Geller Hope this helps others encountering this issue!
Copy code
import asyncio
from prefect import task, flow

async def print_x(x):
  await asyncio.sleep(1)

def build_subflow(name):
  async def subflow(x):
    await print_x(x)
  return subflow

async def parent_flow():
  futures = [build_subflow(name=x)(x) for x in ["a", "b", "c"]]
  await asyncio.gather(*futures)
gratitude thank you 1
A interesting finding: this pattern works even if
is replaced with just
. Prefect gives a warning "A flow named 'subflow'...conflicts with another flow. Consider specifying a unique
parameters in the flow definition", but this does not seem to prevent the curried subflows from running concurrently.
Thanks for sharing, nice workaround! I'll add it to the GitHub issue too. When we rename flows, Prefect treats those as separate flows and will assign a different flow ID - that's why it works. When doing:
Copy code
import asyncio
from prefect import task, flow, get_run_logger

async def print_x(x):
    logger = get_run_logger()
    await asyncio.sleep(1)

def build_subflow():
    async def subflow(x):
        await print_x(x)

    return subflow

async def parent_flow():
    futures = [build_subflow()(x) for x in ["x1", "x2", "x3"]]
    await asyncio.gather(*futures)

if __name__ == "__main__":
it also works, as you mentioned but with a warning, but this is still nicer as it keeps the same flow ID. Thanks again, I appreciate that you dive deeper into this topic
💯 1
Thanks for the fix, @Chris L.!
🙌 1
A few extra things I had to do to make this "production ready": 1. I had to specify
, otherwise if any subflow raised an exception, the whole parent flow fails immediately. 2. Also added this extra post asyncio gather check:
Copy code
states = await asyncio.gather(*subflows, return_exceptions=True)
    if all(state.is_failed() for state in states):
        raise ValueError("🚒 All subflows failed")
@Anna Geller I was looking into the Prefect Orion codebase and noticed this
class that takes a sequence of states and also has some convivence functions i.e. (
). Wondering if this class is meant for public use? Or should I continue using equality checks between
In Prefect 1.0, I used a lot of
isinstance(state, somestateclass)
checks in the codebase, but I don't see any examples of this in Orion. Wondering if any good usage tips have come up regarding state type checking since GA?
Just discovered
state.is_completed() and state.is_crashed
I'd say no i.e. not meant for modification by users, but it's OSS in the end, so fine to change the code I believe it may be beneficial to add your additional findings to the linked GitHub issue -- it could help us solve it directly without workarounds
otherwise if any subflow raised an exception, the whole parent flow fails immediately
I think this is intentional but we have a separate issue to make it more configurable for subflows in general and make subflows submittable like tasks