Hi team, I have a flow containing multiple tasks, ...
# prefect-community
a
Hi team, I have a flow containing multiple tasks, which are intensive in both computation and memory. Due to the essence of the business I want to handle, in one flow run, one task might be called several times for the same parameters or different parameters. To avoid the waste of resource and take the full usage of task result cache, I intend to limit concurrency of the task run with same parameters to 1. Other task runs with same parameters should wait in Pending or AwaitRetrying state. I have implemented this behavior in Prefect 1.x, by wrapping the exact task function with a Redis lock context manager. If a task run acquires the lock, then it could continue with the Running state. Otherwise, it raises a Retry signal, enters Retrying state, retries until it acquires the lock, and returns with cache. My query: is that possible to do this with Prefect 2.x? Can I manually turn a task run's state to Pending or Scheduled during flow runtime? (I read the code and I know Prefect uses a set of rules and core policies to manage the transition between states. I wonder if there is anyway to overwrite/inject rules from client side.) Also, I know we have concurrency limit control by tags in Prefect 2. But is there a feasible way to create/destroy the tags and concurrency limits attached to the tags based on "task name + parameters" during runtime? Much appreciated for any advice.
a
It looks like cache key fn with task input hash is what you need - this way, you can cache the task run computation based on input arguments i.e. task input hash. We also have an open issue somewhere on GitHub to allow manually cleaning cash
a
Thank you Anna. I have already looked into the
cache_key_fn
. This feature is fantastic. However, what I really want to do is to only allow 1 task to enter
Running
state when several tasks with same input are triggered simultaneously. Given below snippet, is that possible to only allow the first
expensive_task
to transit to
Running
while keeping the second one to wait until the first one completes and generates the cache?
Copy code
from prefect.tasks import task_input_hash

@task(cache_key_fn=task_input_hash)
def expensive_task(x):
    # do something consuming a lot of cpu and ram here
    print(x)

@flow
def dummy_flow():
    expensive_task.submit(1)
    expensive_task.submit(1)
👀 1
v
Thanks @Anqi Lu for highlighting this and I think I have similar use case @Anna Geller do you think there is a way at the moment to control the concurrency based on the task and the parameter
a
I really want to do is to only allow 1 task to enter
Running
state
for this, you would need to leverage concurrency limits perhaps you can combine caching AND concurrency limits? alternatively, you can consider making it a subflow and use idempotency keys
Copy code
@task(cache_key_fn=task_input_hash, tags=["expensive"])
then:
Copy code
prefect concurrency-limit create expensive 1
v
So does that mean at given time, only one task with tag=expensive would run even when the task_input_hash for this task could be different. Am I right in my understanding
a
yup correct
so the effect would be: max 1 task run of that task, but if the task run result is cached, it won't be reexecuted until the cache expires
v
Thanks @Anna Geller though what I wanted was to manage concurrency of 1 based on task and input parameter together. So, if the same task is called with different parameter then I would like it to run in concurrently
a
caching and concurrency control are two separate concerns though. You can implement both separately we also have a flow-level caching as feature request in GitHub, perhaps this would help you to run parametrized flows but with flow-level caching?