badasstronaut
09/29/2022, 6:51 PMprefect-aws
and prefect-dask
+ dask-cloudprovider
for autoscalingToby Rahloff
09/30/2022, 8:04 AMBarada Sahu
09/30/2022, 10:29 AMGeorge Coyne
09/30/2022, 3:06 PMbadasstronaut
09/30/2022, 3:14 PMdistributed:fire_and_forget
we’ve crashed at 16 GB scheduler memory.
@Barada Sahu We’ve got a CI/CD pipeline set up against a Dockerfile. The Dockerfile uses a build_arg to provide the upstream tag, so we can boost versions easily. Then we put our apt
and pip
dependencies in RUN
layers within the Dockerfile. This gets pushed to an ECR repository, and the image address gets provided to the FargateCluster class.Shay ben-sasson
10/07/2022, 6:32 AMdask-cloudprovider
was too buggy for us, so we created the dask cluster using pulumi
(we first deploy our image to ecr
, then we make sure that ecs
fetches our image from ecr
).
• We use spot instances for the workers.
• Prefect is only getting the dask cluster uri (when the iaac is completed) and that's it.
• The cluster is ephemeral (it goes up and down by demand).
• It is working pretty well - yet we have learned many things along the way:
◦ The dask architecture has its bottlenecks, to our knowledge, only one scheduler is supported - which means that we had to make sure it had enough CPU and memory for our heavy ETL
workload.
◦ AWS fargate (and spot instances in particular) has its moods - sometimes we ask for 550 instances and after 1/4
of the files we use in our distributed ETL
workflow, we are left with 70% of the workers (that's life when you want to keep it cheap).
◦ Sometimes the cluster has to be restarted because something happened (Prefect 1.0 reported a failed flow, dask has its problems, aws has problems, etc.).
◦ We monitor the workers logs using a tool called saw
- which watches CloudWatch and looks for tokens we injected at important points we'd like to capture.
▪︎ Because of our scale - nothing else was good enough for babysitting - Prefect 1.0 cloud UI could not handle the scale, AWS "container insights" were not informative enough, dask scheduler was so occupied the dashboard was becoming unresponsive.
◦ It is important to think about resilience upfront - your state must be stored outside the workers, hence we broadcast the results to s3 when we complete the work.
◦ We also have a bucket for cache, that a given run can query if the an earlier cluster run did not finish, so it can continue the workload.
• We are currently experimenting with Prefect 2.0 and it looks very promising - we might retry the proposed sequence diagram architecture again - although we are very happy with the one we have.
• We switched to Prefect 2.0 and we use flow of flows pattern, so that when a CI/CD flow is triggered the prefect dataflow run would run our ML pipeline from raw data, through our ETL, training our models and evaluating them to produce evaluation reports.
◦ This complex distributed dataflow could only be achieved with a tool like Prefect 2.0 (we have CPU-bound subflows, e.g. ETL, we have memory-intensive workflows we use EC2 for, e.g. create feature training/validation/testing sets).
◦ The async abilities enable us to execute awaited web requests and even fire-and-forget like pulumi destroy
command.
@badasstronaut Prefect wraps the Dask functionality, so how do you use the distributed
library fire_and_forget
- do you use dask directly (without Prefect)?badasstronaut
10/07/2022, 3:31 PMdask-cloudprovider
is buggy, and we’ve had to do some inheritance spelunking to get it working the way we need. We had a standalone dask cluster with adaptive scaling, but I like the cluster-per-flow execution. We had to deal with a lot of scheduler crashes, which then left the autoscaling logic in a hung state. Getting the system to recover on its own wasn’t too hard, but it still interrupts the data team's transform runs.
Using Dask within Prefect: https://prefecthq.github.io/prefect-dask/#distributing-dask-collections-across-workersToby Rahloff
10/07/2022, 3:50 PMbadasstronaut
10/07/2022, 4:05 PM@flow(task_runner=DaskTaskRunner(cluster_class='dask_cloudprovider.aws.FargateCluster',
cluster_kwargs=dict(image=image,
fargate_spot=True,
cluster_arn=cluster_arn,
execution_role_arn=execution_role_arn,
task_role_arn=task_role_arn,
cloudwatch_logs_group=cloudwatch_logs_group,
vpc=vpc,
security_groups=security_groups,
fargate_use_private_ip=True,
skip_cleanup=True),
# Enable adaptive scaling up to 20 worker containers
adapt_kwargs=dict(minimum=1, maximum=20)))
I think, upon reflection, prefect could give us some powerful options around these problems. The bottleneck is always the scheduler, right? But you don’t want to over-provision. Within a flow, you could monitor the scheduler’s memory. When the memory hits a threshold, shut it down, scale it up, and re-run the same task sets. You could vertically scale the scheduler to right-size per-flow, though it could take a long-ass time…Anna Geller
10/07/2022, 5:42 PMrun_deployment
method available in the 2.5 release to trigger runs from deployments, potentially allowing each subflow to run in an individual container with up to 120 GB of memory, all serverless without distributed or Kubernetes clustersbadasstronaut
10/07/2022, 5:48 PMAnna Geller
10/07/2022, 5:51 PMbadasstronaut
10/07/2022, 5:59 PMAnna Geller
10/07/2022, 7:11 PMToby Rahloff
10/10/2022, 9:52 AMAnna Geller
10/10/2022, 12:13 PMToby Rahloff
10/10/2022, 12:16 PMAnna Geller
10/10/2022, 12:53 PMToby Rahloff
10/10/2022, 1:23 PMbadasstronaut
10/10/2022, 2:51 PMRuntimeError
too. Seeing as it’s firing when the flow run is complete, and it’s not leaving any leftover infra, I think it’s acceptable for now.
...2022-10-05 19:23:44,296 - distributed.deploy.adaptive_core - INFO - Adaptive stop
2022-10-05 19:24:20,328 - distributed.deploy.adaptive_core - INFO - Adaptive stop
19:24:20.328 | INFO | Flow run 'free-yak' - Finished in state Completed('All states completed.')
Traceback (most recent call last):
File "/usr/local/lib/python3.9/weakref.py", line 667, in _exitfunc
f()
File "/usr/local/lib/python3.9/weakref.py", line 591, in __call__
return info.func(*info.args, **(info.kwargs or
{}
))
File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
return sync(
File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 362, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
File "/usr/local/lib/python3.9/weakref.py", line 667, in _exitfunc
f()
File "/usr/local/lib/python3.9/weakref.py", line 591, in __call__
return info.func(*info.args, **(info.kwargs or
{}
))
File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
return sync(
File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 362, in sync
raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Toby Rahloff
10/10/2022, 3:28 PMbadasstronaut
10/10/2022, 3:55 PMdask-cloudprovider
that doesn’t throw IOLoop errors--maybe not with Prefect though. I’m suspicious about several things. Where does prefect-dask
close the FargateCluster? Note that prefect-dask
is using AsyncExitStack
(https://docs.python.org/3/library/contextlib.html#contextlib.AsyncExitStack). From the docs, The close() method is not implemented, aclose() must be used instead.
It looks like dask-cloudprovider.FargateCluster
does not implement aclose
! I want to test this by inheriting from FargateCluster
, adding an aclose
method, and calling cluster._close()
from within aclose
. I think that might fix this issue? I’ll check back here if I get time to test this today….exit_stack
. https://github.com/PrefectHQ/prefect-dask/blob/main/prefect_dask/task_runners.py#L294
ECSCluster: https://github.com/dask/dask-cloudprovider/blob/main/dask_cloudprovider/aws/ecs.py#L448
SpecCluster: (inherited by ECSCluster) https://github.com/dask/distributed/blob/main/distributed/deploy/spec.py#L111
Cluster: (inherited by SpecCluster) https://github.com/dask/distributed/blob/main/distributed/deploy/cluster.py
Nothing implements aclose
Chris Gunderson
10/10/2022, 8:11 PMToby Rahloff
10/11/2022, 6:53 AMChris Gunderson
10/11/2022, 12:48 PMToby Rahloff
10/20/2022, 9:46 AMbadasstronaut
10/20/2022, 3:38 PMaclose
on FargateCluster
:
class ClusterAclose(dask_cloudprovider.aws.FargateCluster):
def __init__(self, *args, **kwargs):
super.__init__(*args, **kwargs)
async def aclose(self):
await super._close()
Still got the issue 🤔.
Observation: Dask has a sync
call defined in a SyncMethodMixin
that gets used by Cluster
and Client
. https://github.com/dask/distributed/blob/main/distributed/utils.py#L313
This is the function throwing the error we see in the logs. So I am wondering: Dask task runner, when it starts, instantiates Cluster
and Client
in a context manager (https://github.com/PrefectHQ/prefect-dask/blob/main/prefect_dask/task_runners.py#L275).
self._connect_to = self._cluster = await exit_stack.enter_async_context(
self.cluster_class(asynchronous=True, **self.cluster_kwargs)
)
if self.adapt_kwargs:
self._cluster.adapt(**self.adapt_kwargs)
self._client = await exit_stack.enter_async_context(
distributed.Client(
self._connect_to, asynchronous=True, **self.client_kwargs
)
)
I’m wondering: If the run is complete and both of those context managers are closing (e.g. the cluster and the client), and they both call sync
, if the cluster has been exited then of course the sync
method from the Client
instance is going to throw this error, right?prefect-dask
https://github.com/PrefectHQ/prefect-dask/issues/46Zanie
10/20/2022, 4:16 PMToby Rahloff
10/21/2022, 7:24 AMbadasstronaut
10/27/2022, 6:10 PM