Joseph Mathes
12/09/2021, 10:41 PMKevin Kho
Kevin Kho
executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "kvnkho/prefect",
"shutdown_on_close": True,
"name": "prefect-cluster",
},
)
flow.executor = executor
flow.register(...)
Kevin Kho
Joseph Mathes
12/10/2021, 12:17 AMJoseph Mathes
12/10/2021, 12:18 AMJoseph Mathes
12/10/2021, 12:19 AMJoseph Mathes
12/10/2021, 12:21 AMJoseph Mathes
12/10/2021, 12:21 AMJoseph Mathes
12/10/2021, 12:24 AM@prefect.task
def transform():
# Create and connect to Coiled cluster
cluster = coiled.Cluster(n_workers=10, name="prefect-task")
client = Client(cluster)
^ This is the part that's spinning up a cluster using a coiled api during a task executionJoseph Mathes
12/10/2021, 12:24 AMKevin Kho
executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "kvnkho/prefect",
"shutdown_on_close": True,
"name": "prefect-cluster",
},
)
flow.executor = executor
For task level, I suppose you can do that. It should work. We also have Resource Managers . If the task shows as pending indefinitely, it might be that your cluster really just failed to spin up, which happens. You can check that you are not using spot instances and check if there was a cluster in the Dashboard and if it started successfully.Kevin Kho
Joseph Mathes
12/10/2021, 12:39 AMKevin Kho
Joseph Mathes
12/10/2021, 12:39 AMJoseph Mathes
12/10/2021, 12:40 AMJoseph Mathes
12/10/2021, 12:40 AMJoseph Mathes
12/10/2021, 12:40 AMJoseph Mathes
12/10/2021, 12:41 AMKevin Kho
Joseph Mathes
12/10/2021, 12:42 AMJoseph Mathes
12/10/2021, 12:43 AMJoseph Mathes
12/10/2021, 12:44 AMKevin Kho
Joseph Mathes
12/10/2021, 12:45 AMJoseph Mathes
12/10/2021, 12:45 AMKevin Kho
Joseph Mathes
12/10/2021, 12:46 AMKevin Kho
Kevin Kho
Joseph Mathes
12/10/2021, 12:50 AMKevin Kho
Joseph Mathes
12/10/2021, 12:52 AMKevin Kho
Joseph Mathes
12/10/2021, 1:18 AMKevin Kho
Gus Cavanaugh
12/11/2021, 11:55 AM