Anqi Lu
11/28/2022, 4:03 AMAnna Geller
11/28/2022, 8:11 AMAnqi Lu
11/28/2022, 3:08 PMcache_key_fn
. This feature is fantastic. However, what I really want to do is to only allow 1 task to enter Running
state when several tasks with same input are triggered simultaneously.
Given below snippet, is that possible to only allow the first expensive_task
to transit to Running
while keeping the second one to wait until the first one completes and generates the cache?
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def expensive_task(x):
# do something consuming a lot of cpu and ram here
print(x)
@flow
def dummy_flow():
expensive_task.submit(1)
expensive_task.submit(1)
Vipul
11/29/2022, 7:19 PMAnna Geller
11/29/2022, 8:37 PMI really want to do is to only allow 1 task to enterfor this, you would need to leverage concurrency limits perhaps you can combine caching AND concurrency limits? alternatively, you can consider making it a subflow and use idempotency keysstateRunning
@task(cache_key_fn=task_input_hash, tags=["expensive"])
then:
prefect concurrency-limit create expensive 1
Vipul
11/29/2022, 8:42 PMAnna Geller
11/29/2022, 11:54 PMVipul
11/30/2022, 6:26 PMAnna Geller
11/30/2022, 7:11 PM