Tim Galvin
05/06/2024, 10:02 AM@task
def my_task(the_path):
print(f"The path is: {the_path}")
@flow
def my_flow():
paths = collection_of_random_files()
for p in paths:
ext = p.suffix
with tag(ext):
my_task.submit(my_task=p)
This should tag each call of my_task
with the extension of the file it has been passed. In the prefect UI I could then group the task runs by these extensions.
However, this level of extra indenting is a little 'meh' for me in the flow function. And in some cases I do not know the value of the extension, e.g. is collection_of_random_files
is in of itself a task
and is return a prefect future.
A neater place to add this tag is in teh my_task
function itself where my_path
is resolved.
Is this possible?Nate
05/06/2024, 1:16 PMwith_options
here
In [1]: from prefect import task
In [2]: @task
...: def foo():
...: pass
...:
In [3]: foo.with_options(tags=["test"])()
08:16:14.288 | INFO | prefect.engine - Created task run 'foo-a45ab9a7' for task 'foo'
08:16:14.746 | INFO | Task run 'foo-a45ab9a7' - Finished in state Completed()
Tim Galvin
05/06/2024, 1:39 PM.map
operations, where I am iterating over the string that has a component that I actually want to use as the tag. A better example would be
@flow
def my_flow():
paths = collection_of_random_files()
my_task.with_options(
tags=transform_path_to_tag(paths) # the ext I was iterating over before
).map(paths)
My problem is that my paths
could be a prefect future, and it might not be the most elegant solution to do what that above. I kind of want the tag to be the thing that is mapped inline with the paths
Tim Galvin
05/06/2024, 1:40 PM.with_options
pattern is one I have been using though, and I loved it so much i am using the same method name throughout my own pipeline to change values of some immutable tuplesNate
05/06/2024, 1:42 PMwith tags(*my_tags):
inside the task definition and pass it in, then it should workTim Galvin
05/06/2024, 1:43 PM.beamNN
component out of a path object being passed around throughout, where NN
ranges from 0 to 35.Tim Galvin
05/06/2024, 1:43 PMTim Galvin
05/06/2024, 1:44 PM.with_options
and its equivalent to tag the activate task run that is calling the tagging function.... If that makes senseNate
05/06/2024, 1:45 PMwell i think if you just move thewith tags(*my_tags):
inside the task definition and pass it in, then it should workim actually not sure this'll work, because what is it actaully tagging? i need my coffee brb
Tim Galvin
05/06/2024, 1:46 PMNate
05/06/2024, 1:56 PMI kind of want the tag to be the thing that is mapped inline with thepaths
.map
is generally for mapping the same instance of a task
(including its tags) across many inputs
it sounds like maybe instead you might want something like
tagged_versions_of_task = [foo.with_options(tags=[tag]) for tag in tags]
for t in tagged_versions_of_task:
t.submit(*args, **kwargs)