https://prefect.io logo
y

yu zeng

06/23/2022, 3:44 AM
hello, is there any way can set task or flow priority when using dask executor on prefect 1.0 ( e.g., with dask.annotation( resource = { "GPU":1 } ) @Anna Geller. i konw prefect 2.0 support this but for some reasons i can't use 2.0
k

Kevin Kho

06/23/2022, 3:48 AM
Anna is on vacation for a bit. You can find the details in the documentation here . Yes it can be done
y

yu zeng

06/23/2022, 4:16 AM
hi kevin, thanks for your ans. but i still don't know how to set priority on task, becasue the link you give only shows how to set worker resources instead of priority.
@Kevin Kho
k

Kevin Kho

06/23/2022, 4:18 AM
Ah there is no further concept of priority in 1. Prefect 1 is really just concerned with schedulling
y

yu zeng

06/23/2022, 4:33 AM
can your team add a dask-priority:10 tag to simply implement this feature? this is very important for a production scene, many tiny-batch task need to set high priority than those big-batch task. priority is really a strong user need for scheduling. they says same as me -> https://github.com/PrefectHQ/prefect/issues/3905
k

Kevin Kho

06/23/2022, 4:56 AM
We’d rather have new features go to Prefect 2 moving forward and it’s already supported there so we’re unlikely to implement this for Prefect. I think you might be able to add a weight for one task already, but not for mapping
y

yu zeng

06/23/2022, 5:11 AM
hi kevin, thanks again. but what's the 'weight' means and how i set a weight for one task ?
k

Kevin Kho

06/23/2022, 5:19 AM
So the dask syntax is like this:
Copy code
with dask.annotate(priority=10):
    future = client.submit(func, *args)
so you might be able to do it in a Prefect task like this:
Copy code
from dask.distributed import worker_client

@task
def mytask():
    with worker_client() as client:
        with dask.annotate(priority=10):
            future = client.submit(func, *args)
    return future.result()
not sure this would work
y

yu zeng

06/23/2022, 5:39 AM
Copy code
from dask.distributed import worker_client
@task
def pre_task():
    return [1,2,3]

@task
def mytask( some_thing  ):
    with worker_client() as client:
        with dask.annotate(priority=10):
            future = client.submit(func,  some_thing )
    return future.result()  
mytask.map( pre_task() )
hi kevin thanks again. like you says, i think i can implement mapping priority in this way ?
k

Kevin Kho

06/23/2022, 5:48 AM
Maybe yes, but not super sure. It’s also a bit of double effort because Prefect uses
client.submit()
to submit a task already, so you are submitting another one so it’s not ideal but it might help your use case
26 Views