Matthias Roels
07/08/2021, 6:52 PMprefect build
command, which seems like an interesting command for my use-case. However, I cannot find any documentation on this command. Is there any, and if so, where can I find it? Thanks!Kevin Kho
Matthias Roels
07/08/2021, 7:26 PMMatthias Roels
07/08/2021, 7:44 PMdef create_flows(flow_metadata_list: list) -> List[Flow]:
flows = []
for flow_metadata in flow_metadata_list:
flow_name = flow_metadata["flow_name"]
with Flow(flow_name) as flow:
# flow definition here using the metadata
# the flows contain dbt code
# and the different flows simply run
# different dbt models
flows.append(flow)
return flows
I want to use Docker storage and create a single docker container for running all these flows. Therefore, I thought using this approach: https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html
from the docs of prefect build
and prefect register
, I read that the build command outputs a json file containing metadata on flow storage that can be used during registration. Is such a workflow even possible?Kevin Kho
Matthias Roels
07/08/2021, 8:26 PMKevin Kho
Matthias Roels
07/08/2021, 8:36 PMKevin Kho
Kevin Kho
Matthias Roels
07/09/2021, 6:02 AMMatthias Roels
07/12/2021, 6:44 PMimport os
import json
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
from prefect.storage import Docker
REGISTRY_URL = os.environ.get("REGISTRY_URL")
def create_dbt_task(config: dict) -> DbtShellTask:
"""Create DbtShellTask using the specified configuration
Parameters
----------
config: dict
flow's configuration file
Returns
-------
DbtShellTask
Description
"""
return DbtShellTask(
return_all=True,
environment=config["target"],
profiles_dir=".",
helper_script="cd dbt",
env = {
"DBT_DBHOST": os.environ.get("DBT_DBHOST"),
"DBT_USER": os.environ.get("DBT_USER"),
"DBT_PASSWORD": os.environ.get("DBT_PASSWORD"),
"DBT_DBNAME": os.environ.get("DBT_DBNAME"),
},
log_stdout=True,
log_stderr=True
)
def create_dbt_flow(config: dict) -> Flow:
dbt = create_dbt_task(config)
flow_storage = Docker(registry_url=REGISTRY_URL, dockerfile="./Dockerfile")
with Flow(name=config["flow_name"], storage=flow_storage) as flow:
dbt_cmd_args = Parameter("dbt_cmd_args", default="", required=False)
seed = dbt(command=f"dbt seed".strip())
source_tests = dbt(
command=f"dbt test --models source", upstream_tasks=[seed]
)
run = dbt(
command=f"dbt run {dbt_cmd_args}".strip(),
upstream_tasks=[source_tests],
)
test = dbt(
command=f"dbt test {dbt_cmd_args}".strip(), upstream_tasks=[run]
)
return flow
config = read_config()
flow = create_dbt_flow(config)
However, I get an error related to the way I use Parameters. I want to use these parameters to determine what dbt models I want to run (and then schedule each parameter on a different clock) so that I can register the whole project as 1 flow with different parameters. How can I fix it? The error is related to
`dbt run <Parameter: dbt_cmd_args>'
so somehow, the parameter is not properly added as a string value.Kevin Kho
dbt_cmd_args
does not exist yet. You might be able to do something like task(lambda x: f"dbt run {x}")(dbt_cmd_args)
to defer the execution of that f-string to run time. You can make it an intermediate task also.Matthias Roels
07/12/2021, 8:39 PMKevin Kho
Matthias Roels
07/13/2021, 6:01 AMDbtShelltask
should also be supplied as parameters (or env variables). So problem solved 😄. Next, trying to make everything work with docker storage. Is it possible to do run flows with docker storage agentless?Kevin Kho