Matthias Roels
07/08/2021, 6:52 PMprefect build
command, which seems like an interesting command for my use-case. However, I cannot find any documentation on this command. Is there any, and if so, where can I find it? Thanks!Kevin Kho
Matthias Roels
07/08/2021, 7:26 PMMatthias Roels
07/08/2021, 7:44 PMdef create_flows(flow_metadata_list: list) -> List[Flow]:
flows = []
for flow_metadata in flow_metadata_list:
flow_name = flow_metadata["flow_name"]
with Flow(flow_name) as flow:
# flow definition here using the metadata
# the flows contain dbt code
# and the different flows simply run
# different dbt models
flows.append(flow)
return flows
I want to use Docker storage and create a single docker container for running all these flows. Therefore, I thought using this approach: https://docs.prefect.io/orchestration/recipes/multi_flow_storage.html
from the docs of prefect build
and prefect register
, I read that the build command outputs a json file containing metadata on flow storage that can be used during registration. Is such a workflow even possible?Kevin Kho
Matthias Roels
07/08/2021, 8:26 PMKevin Kho
Matthias Roels
07/08/2021, 8:36 PMKevin Kho
Kevin Kho
Matthias Roels
07/09/2021, 6:02 AMMatthias Roels
07/12/2021, 6:44 PMimport os
import json
from prefect import task, Flow, Parameter
from prefect.tasks.dbt import DbtShellTask
from prefect.storage import Docker
REGISTRY_URL = os.environ.get("REGISTRY_URL")
def create_dbt_task(config: dict) -> DbtShellTask:
"""Create DbtShellTask using the specified configuration
Parameters
----------
config: dict
flow's configuration file
Returns
-------
DbtShellTask
Description
"""
return DbtShellTask(
return_all=True,
environment=config["target"],
profiles_dir=".",
helper_script="cd dbt",
env = {
"DBT_DBHOST": os.environ.get("DBT_DBHOST"),
"DBT_USER": os.environ.get("DBT_USER"),
"DBT_PASSWORD": os.environ.get("DBT_PASSWORD"),
"DBT_DBNAME": os.environ.get("DBT_DBNAME"),
},
log_stdout=True,
log_stderr=True
)
def create_dbt_flow(config: dict) -> Flow:
dbt = create_dbt_task(config)
flow_storage = Docker(registry_url=REGISTRY_URL, dockerfile="./Dockerfile")
with Flow(name=config["flow_name"], storage=flow_storage) as flow:
dbt_cmd_args = Parameter("dbt_cmd_args", default="", required=False)
seed = dbt(command=f"dbt seed".strip())
source_tests = dbt(
command=f"dbt test --models source", upstream_tasks=[seed]
)
run = dbt(
command=f"dbt run {dbt_cmd_args}".strip(),
upstream_tasks=[source_tests],
)
test = dbt(
command=f"dbt test {dbt_cmd_args}".strip(), upstream_tasks=[run]
)
return flow
config = read_config()
flow = create_dbt_flow(config)
However, I get an error related to the way I use Parameters. I want to use these parameters to determine what dbt models I want to run (and then schedule each parameter on a different clock) so that I can register the whole project as 1 flow with different parameters. How can I fix it? The error is related to
`dbt run <Parameter: dbt_cmd_args>'
so somehow, the parameter is not properly added as a string value.Kevin Kho
dbt_cmd_args
does not exist yet. You might be able to do something like task(lambda x: f"dbt run {x}")(dbt_cmd_args)
to defer the execution of that f-string to run time. You can make it an intermediate task also.Matthias Roels
07/12/2021, 8:39 PMKevin Kho
Matthias Roels
07/13/2021, 6:01 AMDbtShelltask
should also be supplied as parameters (or env variables). So problem solved 😄. Next, trying to make everything work with docker storage. Is it possible to do run flows with docker storage agentless?Kevin Kho
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by