https://prefect.io logo
Title
j

José Duarte

08/19/2022, 10:00 AM
Hey all, I’m currently looking into using Prefect to leverage dbt’s model graph. Ideally, we want to use Prefect’s 2 dynamic flow creation to generate a flow for each node of the dbt graph, however, we’re not being able to launch several flows to run at the same time. We’re definitely missing something, but couldn’t find it in the new docs. Can someone provide some insight? I’m running Prefect locally
o

Oscar Björhn

08/19/2022, 10:08 AM
Are you trying to launch the flows as subflows? I believe they may be more restrictive than tasks when it comes to concurrency, but I haven't used them personally. Other than that, I'm really interested in hearing more about what you're doing! Our dbt runs are visualized in Prefect as a single task (since it's a single command being sent to dbt), but it sounds like you have a neater way of executing things.
j

José Duarte

08/19/2022, 10:12 AM
I’ve loaded the
graph.gpickle
from dbt and I am trying to run each model as a subflow yes, which then uses the
prefect-dbt
util To sort them, I take advantage of
networkx
topological sort utilities
What I really want, is something that allows me to launch flows without waiting, and then keep checking their status to launch the remaining flows. I know that this is just a pseudo re-implementation of dbt’s launch logic but dbt does not have the integration
Even better would be a wait to simply tell Prefect, load this dbt graph and according to the triggered node, run it’s children
o

Oscar Björhn

08/19/2022, 10:21 AM
I'm just curious, why would you like to use flows for this rather than tasks? Is it so you can have them run as separate docker instances or on different vms etc?
I may have a solution you can use if you're sticking with flows, anyway. I wrote a task that starts up a flow and then polls it until it is done, which essentially lets you trigger flows with the same concurrency options you get with tasks. They'll show up as normal flows in the gui though, not subflows.
Hmm, technically it launches deployments though, not flows. Launching flows may be trickier, now that I think about it.
I guess you need to generate them dynamically
Sorry about the flow-of-consciousness style of replies, I just think it's an interesting problem.
j

José Duarte

08/19/2022, 10:26 AM
No worries, ideas are always welcome
W.r.t. the flows question, I’ll get back to you, just need to discuss it on my side
👍 1
o

Oscar Björhn

08/19/2022, 10:28 AM
I might give this a go myself over the weekend, using tasks. Thanks for the info regarding graph.gpickle, sounds like it could be fun.
j

José Duarte

08/19/2022, 10:37 AM
The following description comes from our understanding of Prefect and it might not apply to the new v2, if anything is wrong, please correct me, I’m more than glad to learn more! We’re using flows so we are able to launch several of the same tasks concurrently. Consider a graph like so
A -> B, C
B -> D, E
C -> E
Given a random node on that graph, we want to be able to launch the “topological runner” for the remaining nodes For example: • Given node
C
I want to run,
C
and then run
E
• Given node
B
I want to run
B
and then
D
and
E
I want to be able to run the previous scenarios, at the same time
o

Oscar Björhn

08/19/2022, 10:39 AM
Ah, makes sense. Using tasks would give you less flexibility related to launching sub nodes/paths of the graph, since tasks can't be started directly.
I think I may be a bit over my head here regarding best practices, only been on Prefect 2 for 3 weeks at this point. Hopefully someone more knowledgeable will come along.
j

José Duarte

08/19/2022, 1:26 PM
@Oscar Björhn would you be willing to share your solution wrt the task that launches flows?
o

Oscar Björhn

08/19/2022, 1:32 PM
Sure! It's uh.. probably not the most solid piece of code, but it's been working fine during our 4 days in production 😉. It's an okay start, anyhow. Give me a min.
Although, please note that it launches deployments, not flows.
@task
async def run_deployment(flow_name: str, env: Env, run_name: str, parameters: Dict[str, Any] = None, fire_and_forget: bool = False):
    """
    Creates a flow run from a deployment and polls it until it reaches a terminal state.
    If fire and forget is true, no polling is performed and the function exits early.
    """
    logger = get_run_logger()

    async with get_client() as client:
        deployments = await client.read_deployments()

        for deployment in deployments:
            if deployment.name == f"{flow_name} ({env.value})":
                deployment_id = deployment.id
                break

        flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
        flow_run_id = flow_run.id

        <http://logger.info|logger.info>(f"Created flow run for flow {flow_name} with name: {flow_run.name} and id: {flow_run.id}")

        # If fire_and_forget is true, don't poll, just return.
        while not fire_and_forget:
            flow_run = await client.read_flow_run(flow_run_id=flow_run_id)

            <http://logger.info|logger.info>(f"Waiting for child flow run {flow_name} / {flow_run.name} with state {flow_run.state_type}")

            if flow_run.state_type in TERMINAL_STATES:
                if flow_run.state_type == StateType.COMPLETED:
                    print(f"Child flow run {flow_name} / {flow_run.name} completed successfully.")
                    break
                elif flow_run.state_type in (StateType.CANCELLED, StateType.FAILED, StateType.CRASHED):
                    <http://logger.info|logger.info>(f"Child flow run {flow_name} / {flow_run.name} exited non-successfully.")
                    return Failed()
                else:
                    logger.warn(f"Encountered an unknown terminal state in child flow run {flow_name}: {flow_run.state_type}")
                    return Failed()

            await asyncio.sleep(30)
I launch the task using tags that have a concurrency-limit set, so it can't start more flows than I want to handle simultaneously.
j

José Duarte

08/19/2022, 1:43 PM
What would be the difference between launching deployments and flows?
From your example, it looks as though as it is just as flexible because the deployment ends up launching the flow which can then launch subflows and tasks
The only thing missing would be that the deployment is required to be registered while the flow doesn’t
o

Oscar Björhn

08/19/2022, 2:11 PM
Yeah, that's it exactly.
I suppose you could have a deployment for a generic dbt flow that takes some parameter related to what node in the graph you'd like it to run?
j

José Duarte

08/19/2022, 2:12 PM
I was able to import everything except the
Env
, can you provide a path? I’m not finding it in the docs
o

Oscar Björhn

08/19/2022, 2:13 PM
I'm not 100% sure whether the Orion REST API would be too happy if you ran hundreds of these tasks at the same time by the way, that's why I'm using concurrency limits.
Sorry, Env is an enum we use in my team to separate dev/test/prod environments. We also use it when we name our deployments. You should be able to just remove it.
🙌 1
j

José Duarte

08/19/2022, 2:13 PM
I’m exploring precisely to find the limits, workarounds, etc
👍 1
@Oscar Björhn is there something stopping your code from being a flow?
o

Oscar Björhn

08/19/2022, 3:10 PM
What do you mean?
j

José Duarte

08/19/2022, 3:12 PM
I meant that your code could also be a flow, that would run as a subflow from the main one
Or at least, it seems like it
o

Oscar Björhn

08/19/2022, 3:16 PM
But then I wouldn't get to use all the neat concurrency features in Prefect 2, like submit, map and task concurrency limits.. I think. At least that's what I remember reading. I also feel like tasks are more light weight than than flows, and it seems excessive to create a sub flow only to trigger a flow.
👍 1