Is there a way to tag a task from within a task it...
# ask-community
t
Is there a way to tag a task from within a task itself? For instance, I have something like
Copy code
@task
def my_task(the_path):
    print(f"The path is: {the_path}")

@flow
def my_flow():
    paths = collection_of_random_files()
    for p in paths:
        ext = p.suffix
        with tag(ext):
            my_task.submit(my_task=p)
This should tag each call of
my_task
with the extension of the file it has been passed. In the prefect UI I could then group the task runs by these extensions. However, this level of extra indenting is a little 'meh' for me in the flow function. And in some cases I do not know the value of the extension, e.g. is
collection_of_random_files
is in of itself a
task
and is return a prefect future. A neater place to add this tag is in teh
my_task
function itself where
my_path
is resolved. Is this possible?
n
hi @Tim Galvin - you can use
with_options
here
Copy code
In [1]: from prefect import task

In [2]: @task
   ...: def foo():
   ...:     pass
   ...:

In [3]: foo.with_options(tags=["test"])()
08:16:14.288 | INFO    | prefect.engine - Created task run 'foo-a45ab9a7' for task 'foo'
08:16:14.746 | INFO    | Task run 'foo-a45ab9a7' - Finished in state Completed()
t
Hi Nate - yes that is true. However, my actual pipeline is currently structured using
.map
operations, where I am iterating over the string that has a component that I actually want to use as the tag. A better example would be
Copy code
@flow
def my_flow():
    paths = collection_of_random_files()
    my_task.with_options(
        tags=transform_path_to_tag(paths) # the ext I was iterating over before
    ).map(paths)
My problem is that my
paths
could be a prefect future, and it might not be the most elegant solution to do what that above. I kind of want the tag to be the thing that is mapped inline with the
paths
That
.with_options
pattern is one I have been using though, and I loved it so much i am using the same method name throughout my own pipeline to change values of some immutable tuples
👍 1
n
well i think if you just move the
with tags(*my_tags):
inside the task definition and pass it in, then it should work
t
FWIW - the pipeline I am working with is here: https://github.com/tjgalvin/flint/blob/main/flint/prefect/flows/continuum_pipeline.py I am actually wanting to pull out a
.beamNN
component out of a path object being passed around throughout, where
NN
ranges from 0 to 35.
Huh. Yes. Ok. That makes an incredible amount of sense.
You are a dead set legend. I had my mind set on something like using the
.with_options
and its equivalent to tag the activate task run that is calling the tagging function.... If that makes sense
n
well i think if you just move the
with tags(*my_tags):
inside the task definition and pass it in, then it should work
im actually not sure this'll work, because what is it actaully tagging? i need my coffee brb
t
Huh.... Yeah. I should mock up a test and see. I am cooking dinner down here is sunny Perth
catjam 1
n
sunny here in chicago too! but yes unfortunately what I suggested won't work because we're already inside the task, so I would probably opt for what you suggested here only modification I'd suggest is around this
I kind of want the tag to be the thing that is mapped inline with the
paths
.map
is generally for mapping the same instance of a
task
(including its tags) across many inputs it sounds like maybe instead you might want something like
Copy code
tagged_versions_of_task = [foo.with_options(tags=[tag]) for tag in tags]

for t in tagged_versions_of_task:
  t.submit(*args, **kwargs)