I was playing around with Prefect 0.15.0 and I stu...
# ask-community
m
I was playing around with Prefect 0.15.0 and I stumbled upon the
prefect build
command, which seems like an interesting command for my use-case. However, I cannot find any documentation on this command. Is there any, and if so, where can I find it? Thanks!
k
Hey @Matthias Roels, does this help?
m
I already found that one, but I was hoping there was a little more documentation. But thanks anyway! I will ask here if I get stuck 😄
Maybe I will just ask a different question. I want to create several flows and the only difference between them is the way tasks are configured. I want to explicitly register them as separate flows (not 1 flow with different params), each of which with a different schedule. To automate that, I want to leverage the following snippet:
Copy code
def create_flows(flow_metadata_list: list) -> List[Flow]:
    flows = []
    for flow_metadata in flow_metadata_list:
         flow_name = flow_metadata["flow_name"]
         with Flow(flow_name) as flow:
             # flow definition here using the metadata
             # the flows contain dbt code
             # and the different flows simply run
             # different dbt models 
         flows.append(flow)
    return flows
I want to use Docker storage and create a single docker container for running all these flows. Therefore, I thought using this approach: https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html from the docs of
prefect build
and
prefect register
, I read that the build command outputs a json file containing metadata on flow storage that can be used during registration. Is such a workflow even possible?
k
What do you mean when you say task configuration? You mean different tasks in the flow? or do you have some subclass of Task?
m
The flow consist of the same tasks (running dbt, tests, …) but each flow should run a different set of dbt model (which can be set by using different arguments in DBTShellTask). The thing is: these models should run at different schedules so I want to register them as different flows altogether.
k
If that is the case, a schedule consists of multiple clocks and clocks can have parameters attached to them. You can package this as one flow with multiple clocks in a schedule. Just wanna be sure you’re aware of that.
m
So one flow where each set of parameters is associated with at most one clock?
k
One flow. Multiple Clocks. Each with a set of parameters
👀 2
Check the second code snippet here
m
Impressive! That makes my use-case so much simpler 😮. Thanks a lot for the help!
@Kevin Kho, I am trying to implement what we discussed in the thread above. So far, I have the following script:
Copy code
import os
import json

from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
from prefect.storage import Docker

REGISTRY_URL = os.environ.get("REGISTRY_URL")

def create_dbt_task(config: dict) -> DbtShellTask:
    """Create DbtShellTask using the specified configuration

    Parameters
    ----------
    config: dict
        flow's configuration file

    Returns
    -------
    DbtShellTask
        Description
    """

    return DbtShellTask(
        return_all=True,
        environment=config["target"],
        profiles_dir=".",
        helper_script="cd dbt",
        env = {
            "DBT_DBHOST": os.environ.get("DBT_DBHOST"),
            "DBT_USER": os.environ.get("DBT_USER"),
            "DBT_PASSWORD": os.environ.get("DBT_PASSWORD"),
            "DBT_DBNAME": os.environ.get("DBT_DBNAME"),
        },
        log_stdout=True,
        log_stderr=True
    )


def create_dbt_flow(config: dict) -> Flow:

    dbt = create_dbt_task(config)

    flow_storage = Docker(registry_url=REGISTRY_URL, dockerfile="./Dockerfile")

    with Flow(name=config["flow_name"], storage=flow_storage) as flow:

        dbt_cmd_args = Parameter("dbt_cmd_args", default="", required=False)

        seed = dbt(command=f"dbt seed".strip())

        source_tests = dbt(
            command=f"dbt test --models source", upstream_tasks=[seed]
        )

        run = dbt(
            command=f"dbt run {dbt_cmd_args}".strip(),
            upstream_tasks=[source_tests],
        )

        test = dbt(
            command=f"dbt test {dbt_cmd_args}".strip(), upstream_tasks=[run]
        )

    return flow


config = read_config()

flow = create_dbt_flow(config)
However, I get an error related to the way I use Parameters. I want to use these parameters to determine what dbt models I want to run (and then schedule each parameter on a different clock) so that I can register the whole project as 1 flow with different parameters. How can I fix it? The error is related to
Copy code
`dbt run <Parameter: dbt_cmd_args>'
so somehow, the parameter is not properly added as a string value.
k
Think of it that f-strings, dictionary access, list access happen immediately during build time, while the
dbt_cmd_args
does not exist yet. You might be able to do something like
task(lambda x: f"dbt run {x}")(dbt_cmd_args)
to defer the execution of that f-string to run time. You can make it an intermediate task also.
m
Thanks for the hint! Will have to think about how I can implement it together with the use of the config (which is supplied at build time).
k
Does it not work? You can mix the deferred/non deferred inputs into DbtShelltask
m
Even better: the inputs required by
DbtShelltask
should also be supplied as parameters (or env variables). So problem solved 😄. Next, trying to make everything work with docker storage. Is it possible to do run flows with docker storage agentless?
k
I don’t think so. Agentless is meant for local testing.
👍 1