hello, is there any way can set task or flow priority when using dask executor on prefect 1.0 ( e.g., with dask.annotation( resource = { "GPU":1 } ) @Anna Geller. i konw prefect 2.0 support this but for some reasons i can't use 2.0
k
Kevin Kho
06/23/2022, 3:48 AM
Anna is on vacation for a bit. You can find the details in the documentation here . Yes it can be done
y
yu zeng
06/23/2022, 4:16 AM
hi kevin, thanks for your ans. but i still don't know how to set priority on task, becasue the link you give only shows how to set worker resources instead of priority.
yu zeng
06/23/2022, 4:16 AM
@Kevin Kho
k
Kevin Kho
06/23/2022, 4:18 AM
Ah there is no further concept of priority in 1. Prefect 1 is really just concerned with schedulling
y
yu zeng
06/23/2022, 4:33 AM
can your team add a dask-priority:10 tag to simply implement this feature? this is very important for a production scene, many tiny-batch task need to set high priority than those big-batch task. priority is really a strong user need for scheduling. they says same as me -> https://github.com/PrefectHQ/prefect/issues/3905
k
Kevin Kho
06/23/2022, 4:56 AM
We’d rather have new features go to Prefect 2 moving forward and it’s already supported there so we’re unlikely to implement this for Prefect. I think you might be able to add a weight for one task already, but not for mapping
y
yu zeng
06/23/2022, 5:11 AM
hi kevin, thanks again. but what's the 'weight' means and how i set a weight for one task ?
k
Kevin Kho
06/23/2022, 5:19 AM
So the dask syntax is like this:
Copy code
with dask.annotate(priority=10):
future = client.submit(func, *args)
so you might be able to do it in a Prefect task like this:
Copy code
from dask.distributed import worker_client
@task
def mytask():
with worker_client() as client:
with dask.annotate(priority=10):
future = client.submit(func, *args)
return future.result()
not sure this would work
y
yu zeng
06/23/2022, 5:39 AM
Copy code
from dask.distributed import worker_client
@task
def pre_task():
return [1,2,3]
@task
def mytask( some_thing ):
with worker_client() as client:
with dask.annotate(priority=10):
future = client.submit(func, some_thing )
return future.result()
mytask.map( pre_task() )
hi kevin thanks again. like you says, i think i can implement mapping priority in this way ?
k
Kevin Kho
06/23/2022, 5:48 AM
Maybe yes, but not super sure. It’s also a bit of double effort because Prefect uses
client.submit()
to submit a task already, so you are submitting another one so it’s not ideal but it might help your use case
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.