<@ULVA73B9P> I have this flow run from a deployment defined below failing with the following error: ...
m

Miguel Moncada

over 1 year ago
@Marvin I have this flow run from a deployment defined below failing with the following error:
06:41:31.085 | ERROR   | prefect.worker.kubernetes.kubernetesworker acc28481-964b-45e9-a84f-25cdaa0529a1 - Flow run 2d2e82e1-2e37-4bc1-b25d-7ffc51ee637e did not pass checks and will not be submitted for execution
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 855, in _submit_run
    await self._check_flow_run(flow_run)
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 841, in _check_flow_run
    raise ValueError(
ValueError: Flow run UUID('2d2e82e1-2e37-4bc1-b25d-7ffc51ee637e') was created from deployment 'hello_flow_deployment' which is configured with a storage block. Please use an agent to execute this flow run.
Can you point how the deployment should be defined to work with my k8s work pool, having the flow being executed as a Cloud Run job?
from prefect.deployments import Deployment
from prefect_gcp.cloud_storage import GcsBucket
from prefect_gcp.cloud_run import CloudRunJob
from prefect.client.schemas.schedules import CronSchedule
from dataflows.flows.hello_flow import hello_flow

storage = GcsBucket.load("prefect-storage")
infrastructure = CloudRunJob.load("cloud-run-default")


def deploy_hello_flow():
    deployment = Deployment.build_from_flow(
        flow=hello_flow,
        name="hello_flow_deployment",
        work_queue_name="default",
        storage=storage,
        path="hello_flow",
        tags=["staging"],
        infrastructure=infrastructure,
        schedule=CronSchedule(cron="0 12 1 * *", timezone="UTC"),
    )
    deployment.apply()
Hey, I am having trouble running a dbt prefect flow, and the documentation around this is very confu...
z

Zach Munro

over 1 year ago
Hey, I am having trouble running a dbt prefect flow, and the documentation around this is very confusing, bordering on misleading and incomplete. Here is my flow
import os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext

@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
    flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
    credentials = SnowflakeCredentials(
        user=os.environ.get("SNOWFLAKE_USER"),
        password=os.environ.get("SNOWFLAKE_PASSWORD"),
        account=os.environ.get("SNOWFLAKE_ACCOUNT"),
        role="ACCOUNTADMIN",
    )
    connector = SnowflakeConnector(
        # schema=f"{client_name.upper()}_STG",
        schema="dbt_zmunro",
        threads=8,
        database="RAW",
        warehouse=warehouse,
        credentials=credentials,
        query_tag=f"dbt-data-build-{client}-{flow_run_name}",
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector,
        extras={
            "retry_on_database_errors": True,
            "connect_retries": 0,
            "connect_timeout": 600,
            "retry_all": False,
            "reuse_connections": False,
        },
    )
    dbt_cli_profile = DbtCliProfile(
        name="prefect-snowflake-dev",
        target="dev",
        target_configs=target_configs,
    )
    return DbtCoreOperation(
        commands=[
            f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
        ],
        dbt_cli_profile=dbt_cli_profile,
        overwrite_profiles=True,
        project_dir="/workflows/dbt/",
    ).run()
I am getting an error because the
DbtCoreOperation
cant find my
dbt_cli_profile
correctly. I am trying to follow the documentation here: https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials That documentation doesn't say how to use this
dbt_cli_profile
object with the
DbtCoreOperation
function though. And I am confused as to how blocks relate to this all as well. Should I not be creating these credential objects each flow run and instead just do it once and save it in a block? where does this block get saved if I am not using prefect cloud?
👀 1