Is it possibly to dynamically assign tags to task ...
# prefect-community
j
Is it possibly to dynamically assign tags to task runs based on parameters? I’m using Prefect 1.x and want to do something like the following across multiple flows.
Copy code
my_task.map(databases)
But I don’t want to overload my databases, so I want to set concurrency limits at the task level to make sure there are only ever
N < limit
tasks running against a given database at one time. I know you can set task tags when I’m creating the task object, but can I do so as the flow is executing against mapped tasks?
m
Hey Josh, there currently is not a way to do dynamic tag assignment. I do think it would be a really interesting feature to look further into. I'll open a github issue to look into dynamic tag assignment and a feature enhancement to create an API route for updating task run tags after-the-fact.
🙏 1
@Marvin open "Dynamic tag assignment"
d
@Marvin open “Dynamic tag assignment”
🙏 1
@Marvin shaking angry fist
shaking angry fist 1
d
He got there
a
Hi @Josh! I think there was some confusion here with respect to tags. Tags set on a task are used purely for task-based concurrency limits. Therefore, to solve your problem, you can set a tag corresponding to the limit required by your DB connection limit directly on your
my_task
before mapping over it. This is already possible!
Copy code
@task(tags=[db]) # this sets your tag "db"
now assuming that your DB can only handle 10 parallel connections, you can limit concurrency using:
Copy code
prefect concurrency-limit create db 10
this way, only 10 mapped child tasks will run concurrently/in parallel and your DB won't be overwhelmed All this is already possible, therefore I will close the issue
🙏 1
j
@Anna Geller I’m asking if it is possible to do dynamically based on the parameters passed to the task-run. For instance:
Copy code
queries = ["db1", "db2", "db1", "db2", "db3"]
my_task.map(queries)
In this instance, I want the task runs of
my_task
to get tags based on the parameters for which db it will run against. I don’t want to assign a static tag to
my_task
that will be the same across all parameters.
a
I would highly encourage you to build a Minimal Reproducible Example showing a full workflow and demonstrating what exactly you are trying to accomplish and why this is not possible with the current task-based concurrency limits. I'm confident that your use case is already achievable but you may need a small tweak in your workflow design