Jonathan Pou
08/04/2022, 9:01 PMfrom prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
import dask
coiled_executor = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"n_workers" : 4,
"software": "ttibi-dev",
"shutdown_on_close": True,
"worker_vm_types":["r6a.large"]
},
adapt_kwargs={"maximum": 10}
)
@task
def some_data_manipulation():
df = dask.datasets.timeseries(
"2000", "2020", partition_freq="2w"
).persist()
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
return df
@flow(task_runner=coiled_executor)
def test_flow():
some_data_manipulation.submit()
if __name__ == "__main__":
test_flow()
Zanie
Jonathan Pou
08/04/2022, 9:33 PMAndrew Huang
08/04/2022, 10:19 PMSam Dyson
08/09/2022, 11:09 PMworker_client
inside the task which will let the worker submit jobs to the cluster. I found this documentation helpful: https://distributed.dask.org/en/stable/task-launch.html#connection-with-context-managerZanie