David Martin Calalang
07/15/2025, 8:45 PMtask_runner=DaskTaskRunner(
cluster_class="dask_cloudprovider.aws.FargateCluster,
cluster_kwargs={
"image": {image}
},
adapt_kwargs={
"minimum": 1,
"maximum": 8
}
)
@task(name="foo")
def square(x):
return (x * x)
@flow()
def process_numbers():
numbers = list(range(10))
futures = [square.submit(n) for n in numbers]
results = [future.result() for future in futures]
print(f"Squared numbers: {results}")
return results
if __name__ == "__main__":
process_numbers.with_options(task_runner=task_runner)()
input("END:")
My AWS credentials and region are set with environment variables. This implementation correctly creates a new cluster on AWS ECS which hosts the scheduler and also spawns workers, as well as brings up the Dask dashboard that shows the workers. The Dask dashboard also shows the square
tasks (10 of them) being moved to a "processing" state within the workers.
The issue is that they appear to stay stuck in that state. Moreover, the Prefect UI shows no progress with the message "This flow run has not yet generated any task or subflow runs".
I understand that there is obviously overhead with creating and provisioning resources on ESC for the scheduler and workers, but on my latest run I left it open for 15 minutes, yet it still made no progress. Am I missing something in my task runner configuration?
Note that this implementation works correctly and runs quickly (< 2 seconds) when simply passing in DaskTaskRunner
without a cluster_class (ie. creating a local cluster).