Kaleb King
06/20/2024, 8:13 PMMarvin
06/20/2024, 8:13 PM@task
decorator and pass the tag as a keyword argument. Below is an example based on the provided information.
Here is a sample Python code to add tags to a task in Prefect:
python
from prefect import flow, task
@task(name="example-task", tags=["example-tag"])
def my_task(param: str):
print(f"Received parameter: {param}")
@flow
def my_flow(param: str):
my_task(param)
# Running the flow with a parameter
if __name__ == "__main__":
my_flow("Hello, Marvin!")
In this example:
- The @task
decorator includes the tags
argument to add a tag to the task.
- The param
parameter is a string passed to my_task
, which can also be used as a tag.
For further details, you can consult the Prefect documentation on Tasks.
And here is an additional example showcasing templating arguments into task run names:
python
from prefect import flow, task
@task(task_run_name="Adding {x} and {y}") # you can template arguments into the task name!
def add(x: int, y: int) -> int:
"""Just a python function with a fancy hat on!"""
return x + y
@flow
def my_flow():
"""Tasks must be called in the flow to be executed (at this time)."""
first_result = add(1, 2)
second_result = add(first_result, first_result)
return add(first_result, second_result)
if __name__ == "__main__":
# Run the flow
my_flow()
# You can run tasks' functions directly outside of a flow if you want
# But it won't be tracked by Prefect, it's just a normal function
assert add.fn(1, 2) == 3
For a complete code example, you can refer to this Prefect code example.Kaleb King
06/20/2024, 8:15 PM@task(name = "Connect to Snowflake")
def connect_task(target_credentials, data_object):
connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
return connection, cursor, log_file
data_object
is a string that I want to use as a tag for this task.Marvin
06/20/2024, 8:15 PMdata_object
string as a tag for the task in Prefect, you can dynamically set the tag within the task function. Prefect 2.x supports setting tags dynamically, and you can achieve this by utilizing Prefect's TaskRunContext
.
Here’s how you could modify your code to add data_object
as a tag:
python
from prefect import flow, task, get_run_logger
from prefect.context import TaskRunContext
@task(name="Connect to Snowflake")
def connect_task(target_credentials, data_object):
context = TaskRunContext.get()
context.set_tag(data_object)
connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
return connection, cursor, log_file
@flow
def my_flow(target_credentials, data_object):
connect_task(target_credentials, data_object)
if __name__ == "__main__":
target_credentials = "example_credentials"
data_object = "example_data_object"
my_flow(target_credentials, data_object)
In this example:
1. The TaskRunContext.get()
method retrieves the current task run context.
2. context.set_tag(data_object)
dynamically sets the data_object
as a tag for the task.
This code snippet ensures that your data_object
string is used as a tag for the task. For more information on tasks and tags, you can refer to the Prefect documentation on tasks.Nate
06/20/2024, 9:53 PMcontext.set_tag(data_object)isnt real, but you could either do
with tags(some_tag):
my_task(..., some_kwarg=some_tag)
or write a decorator to more cleanly do the same thing
@Marvin think step by step about how such a decorator would look using functools.wrapsMarvin
06/20/2024, 9:53 PMdata_object
, we can use the with tags
context manager from Prefect. To make this more concise, we can write a decorator that handles the addition of tags.
Here's how you can write such a decorator using `functools.wraps`:
python
import functools
from prefect import task, flow, tags
def tag_with_data_object(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
# Extract the data_object argument from kwargs or args
data_object = kwargs.get('data_object', args[1] if len(args) > 1 else None)
if data_object:
with tags(data_object):
return fn(*args, **kwargs)
else:
return fn(*args, **kwargs)
return wrapper
@task(name="Connect to Snowflake")
@tag_with_data_object
def connect_task(target_credentials, data_object):
connection, cursor, log_file = connect_to_snowflake(target_credentials, data_object)
return connection, cursor, log_file
@flow
def my_flow(target_credentials, data_object):
connect_task(target_credentials, data_object)
if __name__ == "__main__":
target_credentials = "example_credentials"
data_object = "example_data_object"
my_flow(target_credentials, data_object)
In this example:
1. tag_with_data_object
is a decorator that wraps around the task function.
2. It extracts data_object
from either kwargs
or args
.
3. Using with tags(data_object)
, it dynamically sets the data_object
as a tag.
4. The decorator is applied to the connect_task
function using @tag_with_data_object
.
This way, whenever connect_task
is called within the flow, the data_object
string is used as a tag for that task.
For additional reference, you can always consult the Prefect documentation on tags.