Hi , What’s best practice to get the value of a pr...
# prefect-community
d
Hi , What’s best practice to get the value of a prefect Parameter inside a flow?
a
prefect.context.parameters.get("your parameter")
d
Thank you, I’m getting AttributeError: ‘Context’ object has no attribute ‘parameters’ error
k
I think you must be using that outside a task. You can’t do what inside the Flow block because the Flow block is not deferred. It doesn’t make sense to use it there also because you can just pass the parameter directly to a task right?
m
@Kevin Kho is there a recommended way to do this? I'm struggling with the same but I can't solve it.
Copy code
@task
def get_param_env_value(target_env):
    return f"{target_env}"


with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    task_param_env = get_param_env_value(target_env)

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": task_param_env},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
target_env param is always "dev" or "prod", depends of where I want run DBT
dbt function is a DbtShellTask
I tried with this as well:
Copy code
@task
def get_param_env_value(param):
    logger = prefect.context.get("logger")
    parameters = prefect.context.get('parameters')
    <http://logger.info|logger.info>(parameters[param])
    return f"{parameters[param]}"
the printed value is dev that is ok but in my profiles.yml I still get !!python/object:prefect.tasks.core.function.FunctionTask instead of the dev string
k
task_args is not meant for the inputs into the task. The name makes sense but
environment
does not. It’s the things that go in the task decorator like
checkpoint
and
trigger
You shouldn’t need
get_param_env_value
. Can you show me how
dbt()
was defined there?
m
Sure, let me send you all again
Copy code
dbt = DbtShellTask(
    return_all=True,
    profile_name="dbt_profile",
    profiles_dir="dbt2",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script="cd dbt2",
    log_stderr=True,
    dbt_kwargs={
        "dataset": 'dbt_staging',
        "keyfile_json": {
            "type": credentials['type'],
            "project_id": credentials['project_id'],
            "private_key_id": credentials['private_key_id'],
            "private_key": credentials['private_key'],
            "client_email": credentials['client_email'],
            "client_id": credentials['client_id'],
            "auth_uri": credentials['auth_uri'],
            "token_uri": credentials['token_uri'],
            "auth_provider_x509_cert_url": credentials['auth_provider_x509_cert_url'],
            "client_x509_cert_url": credentials['client_x509_cert_url'],
        },
        "location": 'EU',
        "method": "service-account-json",
        "priority": "interactive",
        "project": GCP_BQ_PROJECT,
        "threads": 4,
        "timeout_seconds": 300,
        "type": "bigquery",
    },
)


@task
def format_command(target_env):
    return f"dbt run  --profiles-dir . --target {target_env} "

@task
def get_param_env_value(param):
    logger = prefect.context.get("logger")
    parameters = prefect.context.get('parameters')
    <http://logger.info|logger.info>(parameters[param])
    return f"{parameters[param]}"


with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    task_param_target_env = get_param_env_value('target_env')

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": task_param_target_env},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})

    # send_slack_alert_on_failure_dbt(dbt_run)

dbt_flow.set_reference_tasks([dbt_run])
If I hard code this line:
Copy code
task_args={"name": "DBT Run", "environment": task_param_target_env},
and use this instead:
Copy code
task_args={"name": "DBT Run", "environment": "dev"},
it works
k
Don’t pass environment to task args. It’s not the place for it. Instead, just pass it to
dbt()
Copy code
dbt_run = dbt(...,environment=target_env)
m
Fails when I try to register the flow.
Copy code
with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    # task_param_target_env = get_param_env_value('target_env')

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        environment=target_env,
        task_args={"name": "DBT Run"},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
k
Ah shit I understand the confusion and issue. I don’t think Task args should help you. In my opinion, you might need to override the run method of the
DbtShellTask
to take in
environment
in the run
m
But I do it like this:
Copy code
pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": "dev"},
    )
works
k
Yes but there is a build time, and a run time when you use Prefect. The
task_args
there is evaluated during build time, but Parameters have values during run time. That’s why you get the weird looking
Parameter
class instead as your environment. So when
task_args
runs, the environment part is not known
m
Oh now I got it! I didn't know task_args was in build time. And how can I override the run method?
I mean, as I understand I need a condition based on a param to be able to run dbt with different targets
k
You subclass like:
Copy code
class MyDbtShellTask(DbtShellTask):
    def run(self, environment, ...):
        self.environment = environment
        super.run(other_kwargs_here)
it will be something like that and then you use
MyDbtShellTask
instead
m
Nice, I will try it. Thanks @Kevin Kho!!
I just solved it creating two
DbtShellTask
with different environments (I only need "dev" and "prod") and using case method:
Copy code
@task
def format_command(target_env):
    return f"dbt run  --profiles-dir . --target {target_env} "

@task
def is_target_dev(target_env):
    return target_env == "dev"

with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)

    is_dev_environment = is_target_dev(target_env)
    pull_repo_task = pull_dbt_repo()
    with case(is_dev_environment, True):
        dbt_run = dbt_dev(
            command=format_command(target_env),
            env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
            upstream_tasks=[pull_repo_task],
            task_args={"name": "DBT Run STAGING"},
        )

    with case(is_dev_environment, False):
        dbt_run = dbt_prod(
            command=format_command(target_env),
            env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
            upstream_tasks=[pull_repo_task],
            task_args={"name": "DBT Run PROD"},
        )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
k
Ah that’s clever but I think the MyDbtShellTask will reduce the boilerplate if you can get it working
m
Yes, totally agree. simple smile
d
How about parameter.run() to get the value?
k
Parameter run will happen during registration time, not run time. I think
parameter.run()
does not help in any use case