<@ULVA73B9P> I'm passing a string as an argument t...
# ask-marvin
k
@Marvin I'm passing a string as an argument to a function related to a Prefect task. How can I also add that string as a tag to the task?
m
To add a string as a tag to a Prefect task, you can use the
@task
decorator and pass the tag as a keyword argument. Below is an example based on the provided information. Here is a sample Python code to add tags to a task in Prefect:
Copy code
python
from prefect import flow, task

@task(name="example-task", tags=["example-tag"])
def my_task(param: str):
    print(f"Received parameter: {param}")

@flow
def my_flow(param: str):
    my_task(param)

# Running the flow with a parameter
if __name__ == "__main__":
    my_flow("Hello, Marvin!")
In this example: - The
@task
decorator includes the
tags
argument to add a tag to the task. - The
param
parameter is a string passed to
my_task
, which can also be used as a tag. For further details, you can consult the Prefect documentation on Tasks. And here is an additional example showcasing templating arguments into task run names:
Copy code
python
from prefect import flow, task

@task(task_run_name="Adding {x} and {y}")  # you can template arguments into the task name!
def add(x: int, y: int) -> int:
    """Just a python function with a fancy hat on!"""
    return x + y

@flow
def my_flow():
    """Tasks must be called in the flow to be executed (at this time)."""
    first_result = add(1, 2) 
    second_result = add(first_result, first_result)
    return add(first_result, second_result)

if __name__ == "__main__":
    # Run the flow
    my_flow()
    
    # You can run tasks' functions directly outside of a flow if you want
    # But it won't be tracked by Prefect, it's just a normal function
    assert add.fn(1, 2) == 3
For a complete code example, you can refer to this Prefect code example.
k
@Marvin For example, here's the current task code:
Copy code
@task(name = "Connect to Snowflake")
def connect_task(target_credentials, data_object):
    connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
    return connection, cursor, log_file
data_object
is a string that I want to use as a tag for this task.
m
To use the
data_object
string as a tag for the task in Prefect, you can dynamically set the tag within the task function. Prefect 2.x supports setting tags dynamically, and you can achieve this by utilizing Prefect's
TaskRunContext
. Here’s how you could modify your code to add
data_object
as a tag:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.context import TaskRunContext

@task(name="Connect to Snowflake")
def connect_task(target_credentials, data_object):
    context = TaskRunContext.get()
    context.set_tag(data_object)

    connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
    return connection, cursor, log_file

@flow
def my_flow(target_credentials, data_object):
    connect_task(target_credentials, data_object)

if __name__ == "__main__":
    target_credentials = "example_credentials"
    data_object = "example_data_object"
    my_flow(target_credentials, data_object)
In this example: 1. The
TaskRunContext.get()
method retrieves the current task run context. 2.
context.set_tag(data_object)
dynamically sets the
data_object
as a tag for the task. This code snippet ensures that your
data_object
string is used as a tag for the task. For more information on tasks and tags, you can refer to the Prefect documentation on tasks.
n
context.set_tag(data_object)
isnt real, but you could either do
Copy code
with tags(some_tag):
  my_task(..., some_kwarg=some_tag)
or write a decorator to more cleanly do the same thing @Marvin think step by step about how such a decorator would look using functools.wraps
m
You're right; I appreciate the correction. To dynamically set a tag based on the
data_object
, we can use the
with tags
context manager from Prefect. To make this more concise, we can write a decorator that handles the addition of tags. Here's how you can write such a decorator using `functools.wraps`:
Copy code
python
import functools
from prefect import task, flow, tags

def tag_with_data_object(fn):
    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        # Extract the data_object argument from kwargs or args
        data_object = kwargs.get('data_object', args[1] if len(args) > 1 else None)
        
        if data_object:
            with tags(data_object):
                return fn(*args, **kwargs)
        else:
            return fn(*args, **kwargs)
    
    return wrapper

@task(name="Connect to Snowflake")
@tag_with_data_object
def connect_task(target_credentials, data_object):
    connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
    return connection, cursor, log_file

@flow
def my_flow(target_credentials, data_object):
    connect_task(target_credentials, data_object)

if __name__ == "__main__":
    target_credentials = "example_credentials"
    data_object = "example_data_object"
    my_flow(target_credentials, data_object)
In this example: 1.
tag_with_data_object
is a decorator that wraps around the task function. 2. It extracts
data_object
from either
kwargs
or
args
. 3. Using
with tags(data_object)
, it dynamically sets the
data_object
as a tag. 4. The decorator is applied to the
connect_task
function using
@tag_with_data_object
. This way, whenever
connect_task
is called within the flow, the
data_object
string is used as a tag for that task. For additional reference, you can always consult the Prefect documentation on tags.