Chris L.
08/21/2022, 9:32 AMimport asyncio
from prefect import task, flow
@task
async def print_x(x):
print(x)
await asyncio.sleep(1)
def build_subflow(name):
@flow(name=f"subflow:{name}")
async def subflow(x):
await print_x(x)
return subflow
@flow
async def parent_flow():
futures = [build_subflow(name=x)(x) for x in ["a", "b", "c"]]
await asyncio.gather(*futures)
@flow(name=f"subflow:{name}")
is replaced with just @flow
. Prefect gives a warning "A flow named 'subflow'...conflicts with another flow. Consider specifying a unique name
parameters in the flow definition", but this does not seem to prevent the curried subflows from running concurrently.Anna Geller
08/21/2022, 12:29 PMimport asyncio
from prefect import task, flow, get_run_logger
@task
async def print_x(x):
logger = get_run_logger()
<http://logger.info|logger.info>(x)
await asyncio.sleep(1)
def build_subflow():
@flow
async def subflow(x):
await print_x(x)
return subflow
@flow
async def parent_flow():
futures = [build_subflow()(x) for x in ["x1", "x2", "x3"]]
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(parent_flow())
it also works, as you mentioned but with a warning, but this is still nicer as it keeps the same flow ID.
Thanks again, I appreciate that you dive deeper into this topicKhuyen Tran
08/22/2022, 2:52 PMChris L.
08/25/2022, 4:43 AMreturn_exceptions=True
in asyncio.gather
, otherwise if any subflow raised an exception, the whole parent flow fails immediately.
2. Also added this extra post asyncio gather check:
states = await asyncio.gather(*subflows, return_exceptions=True)
if all(state.is_failed() for state in states):
raise ValueError("🚒 All subflows failed")
@Anna Geller I was looking into the Prefect Orion codebase and noticed this StateGroup
class that takes a sequence of states and also has some convivence functions i.e. (all_completed
, any_failed
). Wondering if this class is meant for public use? state.type
and <http://StateType.XYZ|StateType.XYZ>
?isinstance(state, somestateclass)
checks in the codebase, but I don't see any examples of this in Orion. Wondering if any good usage tips have come up regarding state type checking since GA?state.is_completed() and state.is_crashed
etcAnna Geller
08/25/2022, 12:04 PMotherwise if any subflow raised an exception, the whole parent flow fails immediatelyI think this is intentional but we have a separate issue to make it more configurable for subflows in general and make subflows submittable like tasks