Hello Prefect community. :wave: Wanted to share a ...
# show-us-what-you-got
c
Hello Prefect community. 👋 Wanted to share a small code snippet I've been using to resolve this open issue: https://github.com/PrefectHQ/prefect/issues/5853 It's not technically a "fix" as I am creating new functions in the namespace via currying 🍛, but I think it's a cleaner solution compared to the current hack of using "deepcopy". @Jacob Danovitch @Jeff Carrico @Anna Geller Hope this helps others encountering this issue!
Copy code
import asyncio
from prefect import task, flow

@task
async def print_x(x):
  print(x)
  await asyncio.sleep(1)

def build_subflow(name):
  @flow(name=f"subflow:{name}")
  async def subflow(x):
    await print_x(x)
  return subflow

@flow
async def parent_flow():
  futures = [build_subflow(name=x)(x) for x in ["a", "b", "c"]]
  await asyncio.gather(*futures)
gratitude thank you 1
A interesting finding: this pattern works even if
@flow(name=f"subflow:{name}")
is replaced with just
@flow
. Prefect gives a warning "A flow named 'subflow'...conflicts with another flow. Consider specifying a unique
name
parameters in the flow definition", but this does not seem to prevent the curried subflows from running concurrently.
a
Thanks for sharing, nice workaround! I'll add it to the GitHub issue too. When we rename flows, Prefect treats those as separate flows and will assign a different flow ID - that's why it works. When doing:
Copy code
import asyncio
from prefect import task, flow, get_run_logger


@task
async def print_x(x):
    logger = get_run_logger()
    <http://logger.info|logger.info>(x)
    await asyncio.sleep(1)


def build_subflow():
    @flow
    async def subflow(x):
        await print_x(x)

    return subflow


@flow
async def parent_flow():
    futures = [build_subflow()(x) for x in ["x1", "x2", "x3"]]
    await asyncio.gather(*futures)


if __name__ == "__main__":
    asyncio.run(parent_flow())
it also works, as you mentioned but with a warning, but this is still nicer as it keeps the same flow ID. Thanks again, I appreciate that you dive deeper into this topic
💯 1
k
Thanks for the fix, @Chris L.!
🙌 1
c
A few extra things I had to do to make this "production ready": 1. I had to specify
return_exceptions=True
in
asyncio.gather
, otherwise if any subflow raised an exception, the whole parent flow fails immediately. 2. Also added this extra post asyncio gather check:
Copy code
states = await asyncio.gather(*subflows, return_exceptions=True)
    if all(state.is_failed() for state in states):
        raise ValueError("🚒 All subflows failed")
@Anna Geller I was looking into the Prefect Orion codebase and noticed this
StateGroup
class that takes a sequence of states and also has some convivence functions i.e. (
all_completed
,
any_failed
). Wondering if this class is meant for public use? Or should I continue using equality checks between
state.type
and
<http://StateType.XYZ|StateType.XYZ>
?
In Prefect 1.0, I used a lot of
isinstance(state, somestateclass)
checks in the codebase, but I don't see any examples of this in Orion. Wondering if any good usage tips have come up regarding state type checking since GA?
Just discovered
state.is_completed() and state.is_crashed
etc
a
I'd say no i.e. not meant for modification by users, but it's OSS in the end, so fine to change the code I believe it may be beneficial to add your additional findings to the linked GitHub issue -- it could help us solve it directly without workarounds
otherwise if any subflow raised an exception, the whole parent flow fails immediately
I think this is intentional but we have a separate issue to make it more configurable for subflows in general and make subflows submittable like tasks