I'm trying to dynamically add tags to a flow by iterating through a list. Tried a few different thi...
s
I'm trying to dynamically add tags to a flow by iterating through a list. Tried a few different things, but keep getting
TypeError: 'list' object does not support the context manager protocol
or
TypeError: 'generator' object does not support the context manager protocol
Here's one of my attempts. Any suggestions on how to do this?
Copy code
from prefect import flow, task, tags
from prefect.context import TagsContext

def add_tags(tag):
    with tags(tag):
        yield

@flow()
def my_flow():
    # print(TagsContext.get().current_tags)
    print('inside flow')


tag_list = ['mytag1', 'mytag2']

with [add_tags(x) for x in tag_list]:
    my_flow()
n
hi @Sean Conroy - hmm can you explain what's motivating your use case? are you trying to add tags to a flow based on conditions that arise within the flow context?
s
Hi @Nate so we're setting up a service that can be accessed by a variety of other services, and when those services access this service, we are trying to get them to use custom tags to keep track of the flow runs. It would be a preference, not a requirement, to allow for an unbounded number of tags to be added to the flow run, to accommodate all the various run conditions....
n
okay, I guess I'm wondering where the
Copy code
from prefect import flow, task, tags
from prefect.context import TagsContext

@flow(log_prints=True)
def my_flow():
   print(TagsContext.get().current_tags)

with tags("foo", "bar"):
   my_flow()
syntax becomes insufficient for your use case
s
It may not be...maybe i'm not understanding it, but for the example above i assumed that I would have to know that there will be two tags which are 'hardcoded' in. But it would be nice if that was not a requirement
n
i guess it depends, how/when do you find out what a given flow run should be tagged with? perhaps a standalone deployment of a flow (without any inherent tags) that you trigger with `run_deployment(..., tags=[...])` would be easiest in some cases
d
Are you changing tag of a running flow?
s
No...just triggering new flow runs
Nate - yet - I have used that, and it would perfect, except using run_deployment would trigger a run in a different pod, and we want to keep it within the same pod, so that's why we are calling the flow directly using
with tags()
n
ahh that makes sense, could do you something like this then?
Copy code
customer_to_tags = {"customer 1": ["foo", "bar"], ...}

@flow
def customer_facing_flow(customer: str):

   with tags(*(customer_to_tags.get(customer) or [])):
      my_flow()
s
oh interesting. let me give that a shot!
hmm getting a
TypeError: 'list' object is not callable
n
can you show your code?
s
Copy code
from prefect import flow, tags
tags = ["foo", "bar"]

@flow
def my_flow():
    print('inside flow')

@flow
def customer_facing_flow(customer: str):

   with tags(*(tags or [])):
      my_flow()

customer_facing_flow('customer 1')
i simplified it a bit...
n
ah I'd choose a diff name for your
tags
like
Copy code
from prefect import flow, tags
customer_tags = ["foo", "bar"]

@flow
def my_flow():
    print('inside flow')

@flow
def customer_facing_flow(customer: str):

   with tags(*(customer_tags or [])):
      my_flow()

customer_facing_flow('customer 1')
you overwrote the imported context manager
tags
with a list which is not callable
s
of course.
yes - i renamed it and works great now
thanks so much for the help!
n
sure thing!
426 Views