https://prefect.io logo
Title
i

ishan

07/22/2022, 9:49 AM
Hi - I have a question about orchestrating a DAG of tasks with the 2.0 API. Specifically, I am trying to write something like DBT, where I define table views dependent on other views. When materializing a view, it first materializes its dependencies, and then runs the code for itself. The dependencies for each view are available statically (i.e. in the type signature etc.). • When the dependency graph is a tree, execution is easy enough. I try to run the flow for view E; E evaluates its dependencies C and D, which recursively evaluate A and B before running themselves. ◦
A -> C, B -> D, (C, D) -> E
• When the dependency graph is a DAG, I am not sure how to approach it. E will try to evaluate B and C, but both could concurrently try to evaluate A, while we only want A to be executed once. ◦
A -> B, A -> C, (B, C) -> E
A solution to this is topological sorting the full graph, and executing views as soon as their parent dependencies have already been run. I wrote the code manually but curious if the prefect API can handle it for me.
1
👀 1
a

Anna Geller

07/22/2022, 10:37 AM
wow, users like yourself will never stop surprising me! 😄 this looks super cool! But I believe Prefect cannot automatically infer those dependencies so the easiest would be to actually use something like dbt and trigger/orchestrate that from Prefect curious, why did you choose to implement it this way rather than using dbt? you like the Python interface for such transformations more than SQL? or was it just a project purely for fun, as in funsql? :)
💯 1
i

ishan

07/22/2022, 1:32 PM
Thanks for taking a look! I think creating prefect flows on the fly is the way to go then. This is definitely not for production usage, which dbt is great at. Though I don't love the sql + Jinja combination, so this code plays with alternatives :)
👍 1
a

Andreas

07/22/2022, 1:33 PM
I had to solve a similar problem for Prefect 2.0 where I wanted to create flows on the fly from already specified dag specifiacations, though I never fully finished this project - as the need for it got cancelled-, I had it working well enough. From what I remember I was also visiting the nodes in a topological order. Then I also used a technique called memoization where I had a list of future prefect's task runs. If the task was already run I just got it's result from the results list. Otherwise if there was no result available I was executing that task to make it available. This way you avoid running tasks multiple times. Also maybe using the caching functionality of Prefect can also somewhat solve this problem in a different way
i

ishan

07/22/2022, 2:12 PM
Memoization is cool! Though I don't think caching prevents a common dependency being executed twice right? If nodes B and C both depend on A, B might already have started work on A and not finished by the time C got to it.
a

Andreas

07/22/2022, 4:04 PM
Maybe when visiting the dag in topological order you save in the memorization table all the prefect future tasks you come accross instead of the results directly. Then you can simply check if the task is in a running state before starting any new work. Does this help?🤔
i

ishan

07/22/2022, 4:34 PM
oh, for sure. After a top-sort, we could just block on starting each node until its parent nodes have finished. I just meant to say that caching wouldn't help here :)
👍 2
f

Filip Floegel

07/31/2022, 8:43 AM
Sounds like messaging system like pulsar keeping the state (CQRS) could solve that