# Co-routine tasks I was considering creating a “...
# prefect-contributors
# Co-routine tasks I was considering creating a “PIN” (or two) concerning “co-routine” tasks. I thought I’d write here first to get some initial feedback. --- A co-routine task runs corresponds to a co-routine in python. It runs multiple times, consuming marked parameters, before optionally transitioning back to a pending state, ready to run again if/when new versions of the parameters are available. It may be implemented in python as an actual co-routine, but also could be implemented using signals. ## Use cases ### Loops over complex sub-graphs in flows. Currently, it is impossible to loop over a subgraph of a flow. A “co-routine” is a primitive that could function as a building block for while loops with complex substructure. Tasks downstream of a “loop controller” co-routine task would run after each execution of the loop body, but could have edges that fed parameters back up to the loop control task. ### Processing external streams The current architecture is not well suited to processing large event streams -- such as a Kafka or NATS stream that could have 100s or 1000s of events per second. Processing such streams should not have to spin up an individual flow per event; but at the same time, it should not be limited to a single long-running event that handles all processing internally. A co-routine task can be triggered for each event, waiting in a pending state until an event arrives. ## Implementation Both internal “loop back” edges, and external edges, would be specially marked as co-routine inputs, so they could be consumed per-iteration, and waited on for the next iteration. A Pending->Coroutine State would be added to represent running coroutines. Internally, co-routine tasks could use dask pub/sub. For external links, the server/cloud could provide a webhook. As a further extension, this could optionally support upgrade to a websocket; also as further extensions, connectors to standard event queues could be provided. --- I like to think of Prefect as “an OS for FAAS processes” — or at least FAAS written in python and running in Dask. I don’t know to what extent this overlaps with your vision? Co-routine tasks could be used as a primitive to create complex flow processes containing event loops. They would be a big jump toward generalizing what Prefect could do.
upvote 1
👀 2
Hey Shaun - some really interesting ideas here, thanks for sharing. While I don't have an immediate update for you, I can say that this does align closely with our (not yet announced) product vision. There have been a few attempts this year to implement various aspects of what you're describing, but all have met significant challenges related to the underlying DAG assumptions of classic workflow management. In the near future, well begin laying out our plan to "break the DAG" to enable interesting architectures like this one as first class patterns. Would love to discuss further with you to learn more about what you have in mind.
Thanks for the feedback. Hmm… So not sure if I should present a relatively abstract idea, which may get accolades for being a good idea, but not really help you out, or present more concrete details, which, however, are likely not to be right 🙂 — given I am just learning the internals, but which might (somewhat accidentally) be useful. [BTW — I may have some time to work on implementation, but have other work to do as well.] I probably should try to chart an in between course. It seems to me that the DAG is useful for tracking task state. “Backward edges” — or external edges that trigger co-routine actions — I envision as being specially marked. (Like “mapped()“). You can implement most currently unimplementable flow patterns using using co-routine primitives. (In particular you can implement arbitrary sequential loops and recursion. You are still missing a few “relational algebra”-like constructs. Eg. “partial fan-in” which could be implemented by an
facility; and cross products of vectors, which would also be expressible by edge annotations. These wouldn’t need “breaking the DAG” though.) Separately, I wonder if all edges shouldn’t be implemented with pub/sub in Dask. But I haven’t read through your current implementation in enough detail to understand the trade-offs. I would be eager to have some insight into your plans, as it would help me with my planning. I’d be happy to sign an NDA; I could also disclose some of the things we are building that may be interesting to you as well.
I think what you’re describing is an intriguing idea but would necessitate a significant overhaul of the current execution pipeline to achieve in Prefect today. I believe you will soon see primitives emerge that would make this possible without any special work, however let’s find a moment to connect because I’d like to better understand your practical objective. I’ll DM you