https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Dileep Damodaran

05/31/2022, 10:49 AM
Hi , What’s best practice to get the value of a prefect Parameter inside a flow?
a

Anna Geller

05/31/2022, 11:24 AM
prefect.context.parameters.get("your parameter")
d

Dileep Damodaran

05/31/2022, 11:31 AM
Thank you, I’m getting AttributeError: ‘Context’ object has no attribute ‘parameters’ error
k

Kevin Kho

05/31/2022, 2:27 PM
I think you must be using that outside a task. You can’t do what inside the Flow block because the Flow block is not deferred. It doesn’t make sense to use it there also because you can just pass the parameter directly to a task right?
m

Mateo Merlo

05/31/2022, 3:16 PM
@Kevin Kho is there a recommended way to do this? I'm struggling with the same but I can't solve it.
Copy code
@task
def get_param_env_value(target_env):
    return f"{target_env}"


with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    task_param_env = get_param_env_value(target_env)

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": task_param_env},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
target_env param is always "dev" or "prod", depends of where I want run DBT
dbt function is a DbtShellTask
I tried with this as well:
Copy code
@task
def get_param_env_value(param):
    logger = prefect.context.get("logger")
    parameters = prefect.context.get('parameters')
    <http://logger.info|logger.info>(parameters[param])
    return f"{parameters[param]}"
the printed value is dev that is ok but in my profiles.yml I still get !!python/object:prefect.tasks.core.function.FunctionTask instead of the dev string
k

Kevin Kho

05/31/2022, 3:53 PM
task_args is not meant for the inputs into the task. The name makes sense but
environment
does not. It’s the things that go in the task decorator like
checkpoint
and
trigger
You shouldn’t need
get_param_env_value
. Can you show me how
dbt()
was defined there?
m

Mateo Merlo

05/31/2022, 3:56 PM
Sure, let me send you all again
Copy code
dbt = DbtShellTask(
    return_all=True,
    profile_name="dbt_profile",
    profiles_dir="dbt2",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script="cd dbt2",
    log_stderr=True,
    dbt_kwargs={
        "dataset": 'dbt_staging',
        "keyfile_json": {
            "type": credentials['type'],
            "project_id": credentials['project_id'],
            "private_key_id": credentials['private_key_id'],
            "private_key": credentials['private_key'],
            "client_email": credentials['client_email'],
            "client_id": credentials['client_id'],
            "auth_uri": credentials['auth_uri'],
            "token_uri": credentials['token_uri'],
            "auth_provider_x509_cert_url": credentials['auth_provider_x509_cert_url'],
            "client_x509_cert_url": credentials['client_x509_cert_url'],
        },
        "location": 'EU',
        "method": "service-account-json",
        "priority": "interactive",
        "project": GCP_BQ_PROJECT,
        "threads": 4,
        "timeout_seconds": 300,
        "type": "bigquery",
    },
)


@task
def format_command(target_env):
    return f"dbt run  --profiles-dir . --target {target_env} "

@task
def get_param_env_value(param):
    logger = prefect.context.get("logger")
    parameters = prefect.context.get('parameters')
    <http://logger.info|logger.info>(parameters[param])
    return f"{parameters[param]}"


with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    task_param_target_env = get_param_env_value('target_env')

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": task_param_target_env},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})

    # send_slack_alert_on_failure_dbt(dbt_run)

dbt_flow.set_reference_tasks([dbt_run])
If I hard code this line:
Copy code
task_args={"name": "DBT Run", "environment": task_param_target_env},
and use this instead:
Copy code
task_args={"name": "DBT Run", "environment": "dev"},
it works
k

Kevin Kho

05/31/2022, 4:00 PM
Don’t pass environment to task args. It’s not the place for it. Instead, just pass it to
dbt()
Copy code
dbt_run = dbt(...,environment=target_env)
m

Mateo Merlo

05/31/2022, 4:04 PM
Fails when I try to register the flow.
Copy code
with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)
    # dbt_run = dbt(command=format_command(target_env))
    # task_param_target_env = get_param_env_value('target_env')

    pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        environment=target_env,
        task_args={"name": "DBT Run"},
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
k

Kevin Kho

05/31/2022, 4:07 PM
Ah shit I understand the confusion and issue. I don’t think Task args should help you. In my opinion, you might need to override the run method of the
DbtShellTask
to take in
environment
in the run
m

Mateo Merlo

05/31/2022, 4:12 PM
But I do it like this:
Copy code
pull_repo_task = pull_dbt_repo()
    dbt_run = dbt(
        command=format_command(target_env),
        env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
        upstream_tasks=[pull_repo_task],
        task_args={"name": "DBT Run", "environment": "dev"},
    )
works
k

Kevin Kho

05/31/2022, 4:17 PM
Yes but there is a build time, and a run time when you use Prefect. The
task_args
there is evaluated during build time, but Parameters have values during run time. That’s why you get the weird looking
Parameter
class instead as your environment. So when
task_args
runs, the environment part is not known
m

Mateo Merlo

05/31/2022, 4:19 PM
Oh now I got it! I didn't know task_args was in build time. And how can I override the run method?
I mean, as I understand I need a condition based on a param to be able to run dbt with different targets
k

Kevin Kho

05/31/2022, 4:22 PM
You subclass like:
Copy code
class MyDbtShellTask(DbtShellTask):
    def run(self, environment, ...):
        self.environment = environment
        super.run(other_kwargs_here)
it will be something like that and then you use
MyDbtShellTask
instead
m

Mateo Merlo

05/31/2022, 4:29 PM
Nice, I will try it. Thanks @Kevin Kho!!
I just solved it creating two
DbtShellTask
with different environments (I only need "dev" and "prod") and using case method:
Copy code
@task
def format_command(target_env):
    return f"dbt run  --profiles-dir . --target {target_env} "

@task
def is_target_dev(target_env):
    return target_env == "dev"

with Flow("DBT flow") as dbt_flow:
    target_env = Parameter("target_env", required=True)

    is_dev_environment = is_target_dev(target_env)
    pull_repo_task = pull_dbt_repo()
    with case(is_dev_environment, True):
        dbt_run = dbt_dev(
            command=format_command(target_env),
            env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
            upstream_tasks=[pull_repo_task],
            task_args={"name": "DBT Run STAGING"},
        )

    with case(is_dev_environment, False):
        dbt_run = dbt_prod(
            command=format_command(target_env),
            env={"GCP_BIGQUERY_DATASET": GCP_BQ_DATASET},
            upstream_tasks=[pull_repo_task],
            task_args={"name": "DBT Run PROD"},
        )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Output"})
k

Kevin Kho

05/31/2022, 5:30 PM
Ah that’s clever but I think the MyDbtShellTask will reduce the boilerplate if you can get it working
m

Mateo Merlo

05/31/2022, 6:32 PM
Yes, totally agree. simple smile
d

Dileep Damodaran

06/01/2022, 7:31 AM
How about parameter.run() to get the value?
k

Kevin Kho

06/01/2022, 2:21 PM
Parameter run will happen during registration time, not run time. I think
parameter.run()
does not help in any use case
4 Views