https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Ken Nguyen

04/17/2022, 10:17 PM
How do I get the value of the input of my parameters? I was doing something like this in my flow:
Copy code
test_param = Parameter('test_param', default="default_val")
function(test_param)
Where I got an AttributeError:
AttributeError: 'Parameter' object has no attribute
I believe I would like to do something like
function(test_param.value)
to input just the string value of the parameter
A bit puzzled by this. With the below code example, the parameter going into print_param is supposedly the string value that I entered (output of that function is the string + the type is a str type). But with the DbtShellTask, I believe it’s passing in the parameter object (
<Parameter: dbt_command>
). Why is there a different behaviour between the two tasks?
Copy code
@task
def print_param(param):
    <http://logger.info|logger.info>(param)
    <http://logger.info|logger.info>(type(param))

with Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    dbt_command = Parameter('dbt_command', required=True)
    print_param(dbt_command)
    
    dbt_run = DbtShellTask(
        command = dbt_command,
        ...,
        dbt_kwargs={
            ...
        },
    )
k

Kevin Kho

04/18/2022, 2:08 AM
Parameters are JSONSerialized because they go through the API so they don’t accept Python classes. Does that make sense? So only basic Python types can be accepted. Prefect 2.0 will coerce it if you provide a schema for it
k

Ken Nguyen

04/18/2022, 5:17 AM
I think I understand what you’re saying, but I don’t understand why that would lead to parameters being different a different object type when passed onto different tasks. I guess more confusing is how the parameter seemingly remains a parameter object when passed into DbtShellTask.
k

Kevin Kho

04/18/2022, 1:38 PM
Ah ok I know that you mean. Do this?
Copy code
with Flow("flow-name", run_config=RUN_CONFIG, storage=STORAGE) as flow:
    dbt_command = Parameter('dbt_command', required=True)()
    print_param(dbt_command)
The first parenthesis is the init and the second one is the run. The run will force it.
k

Ken Nguyen

04/18/2022, 4:57 PM
I tried that and it seems like dbt_command is still being passed to DbtShellTask as a parameter object based on the below errror message:
Copy code
Task 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/dbt/dbt.py", line 192, in run
    return super(DbtShellTask, self).run(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks/shell.py", line 131, in run
    tmp.write(command.encode())
AttributeError: 'Parameter' object has no attribute 'encode'
k

Kevin Kho

04/18/2022, 5:02 PM
That’s weird I can test in a bit
Ok i re-read. Why are you using a function inside the Flow to get the param value? Functions will execute immediately before the param has a value during Flow registration time. It needs to be a task to defer the execution
The Parameter value doesn’t exist in flow registration time
k

Ken Nguyen

04/18/2022, 5:21 PM
Sorry my initial message said functio, but it’s actually a task. To be specific, it’s the DbtShellTask
Wait, that IS a task right?
k

Kevin Kho

04/18/2022, 5:22 PM
Ah yeah. Task should just work off the shelf tho?
Copy code
from prefect import task, Flow, Parameter
import prefect 

@task
def abc(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    return x

with Flow("..") as flow:
    test = Parameter("test", required=True)
    abc(test)

flow.run(parameters={"test": 2})
Ohh I understand
DbtShellTask has an init and a run. You are passing the parameter to the init so it’s evaluated in build time. Only the run is deferred
k

Ken Nguyen

04/18/2022, 5:28 PM
Ohhh
What can I do to prevent that?
k

Kevin Kho

04/18/2022, 5:30 PM
Only stuff in the run method can be parameterized
k

Ken Nguyen

04/18/2022, 5:42 PM
Can I create a task that takes in the parameter and outputs it again, then feed that task output into DbtShellTask? Does that defer the parameter to run time?
k

Kevin Kho

04/18/2022, 5:47 PM
But that’s effectively just passing it into the run right?
Copy code
with Flow("..") as flow:
    test = Parameter("test", required=True)
    DbtShellTask(__init__here)(..., test)
which will work
k

Ken Nguyen

04/18/2022, 6:02 PM
OH
I see!
Okay, thank you for your help, I actually understood the root cause of the issue and learnt a lot more about prefect!
k

Kevin Kho

04/18/2022, 6:12 PM
Nice work!
🙌 1
4 Views