https://prefect.io logo
b

badasstronaut

09/29/2022, 6:51 PM
Can I get feedback/corrections on this sequence diagram? I’m using
prefect-aws
and
prefect-dask
+
dask-cloudprovider
for autoscaling
👀 1
t

Toby Rahloff

09/30/2022, 8:04 AM
Looks great! How many tasks do you plan to run in parallel? Depending on the load pattern it might make sense to run a couple of stress tests to see how the Dask head node behaves on a production-like load to right-size it (memory can become a bottleneck)
💯 1
b

Barada Sahu

09/30/2022, 10:29 AM
Are you also planning on executing tasks which have dependencies and if so how do you plan on resolving them in the dask cluster?
g

George Coyne

09/30/2022, 3:06 PM
Diagram-wise this is exactly how Prefect works with dask-cloudprovider.
🙏🏻 1
🙏 1
b

badasstronaut

09/30/2022, 3:14 PM
Thanks: Consider this comment permission to copy/modify whatever for any use cases. @Toby Rahloff We’re using autoscaling, so we can dynamically allocate max worker #s. As you mentioned, memory can become a bottleneck, especially for the scheduler. We have an independent Dask cluster that scales up to 100 workers, and depending on the payloads _returned _ from tasks and whether or not the tasks are invoked with
distributed:fire_and_forget
we’ve crashed at 16 GB scheduler memory. @Barada Sahu We’ve got a CI/CD pipeline set up against a Dockerfile. The Dockerfile uses a build_arg to provide the upstream tag, so we can boost versions easily. Then we put our
apt
and
pip
dependencies in
RUN
layers within the Dockerfile. This gets pushed to an ECR repository, and the image address gets provided to the FargateCluster class.
🎉 2
s

Shay ben-sasson

10/07/2022, 6:32 AM
We are using a similar mechanism, yet it is a bit different: • When we started developing this workflow (a year and a half ago), we felt that
dask-cloudprovider
was too buggy for us, so we created the dask cluster using
pulumi
(we first deploy our image to
ecr
, then we make sure that
ecs
fetches our image from
ecr
). • We use spot instances for the workers. • Prefect is only getting the dask cluster uri (when the iaac is completed) and that's it. • The cluster is ephemeral (it goes up and down by demand). • It is working pretty well - yet we have learned many things along the way: ◦ The dask architecture has its bottlenecks, to our knowledge, only one scheduler is supported - which means that we had to make sure it had enough CPU and memory for our heavy
ETL
workload. ◦ AWS fargate (and spot instances in particular) has its moods - sometimes we ask for 550 instances and after 1/4 of the files we use in our distributed
ETL
workflow, we are left with 70% of the workers (that's life when you want to keep it cheap). ◦ Sometimes the cluster has to be restarted because something happened (Prefect 1.0 reported a failed flow, dask has its problems, aws has problems, etc.). ◦ We monitor the workers logs using a tool called
saw
- which watches CloudWatch and looks for tokens we injected at important points we'd like to capture. ▪︎ Because of our scale - nothing else was good enough for babysitting - Prefect 1.0 cloud UI could not handle the scale, AWS "container insights" were not informative enough, dask scheduler was so occupied the dashboard was becoming unresponsive. ◦ It is important to think about resilience upfront - your state must be stored outside the workers, hence we broadcast the results to s3 when we complete the work. ◦ We also have a bucket for cache, that a given run can query if the an earlier cluster run did not finish, so it can continue the workload. • We are currently experimenting with Prefect 2.0 and it looks very promising - we might retry the proposed sequence diagram architecture again - although we are very happy with the one we have. • We switched to Prefect 2.0 and we use flow of flows pattern, so that when a CI/CD flow is triggered the prefect dataflow run would run our ML pipeline from raw data, through our ETL, training our models and evaluating them to produce evaluation reports. ◦ This complex distributed dataflow could only be achieved with a tool like Prefect 2.0 (we have CPU-bound subflows, e.g. ETL, we have memory-intensive workflows we use EC2 for, e.g. create feature training/validation/testing sets). ◦ The async abilities enable us to execute awaited web requests and even fire-and-forget like
pulumi destroy
command. @badasstronaut Prefect wraps the Dask functionality, so how do you use the
distributed
library
fire_and_forget
- do you use dask directly (without Prefect)?
b

badasstronaut

10/07/2022, 3:31 PM
@Shay ben-sasson Thanks for sharing. I agree
dask-cloudprovider
is buggy, and we’ve had to do some inheritance spelunking to get it working the way we need. We had a standalone dask cluster with adaptive scaling, but I like the cluster-per-flow execution. We had to deal with a lot of scheduler crashes, which then left the autoscaling logic in a hung state. Getting the system to recover on its own wasn’t too hard, but it still interrupts the data team's transform runs. Using Dask within Prefect: https://prefecthq.github.io/prefect-dask/#distributing-dask-collections-across-workers
t

Toby Rahloff

10/07/2022, 3:50 PM
@badasstronaut Do you have a version of dask-cloudprovider that works the way you want it to do? We also gave it a try with Kubernetes and ECS Fargate - both were complete trainwrecks for us and definitely not prod-ready. We kinda abandoned the whole setup because we need the one-cluster-per-flow-run, otherwise the scheduler always runs into bottlenecks (saw a Dask scheduler capping out at 256gb mem when lab testing our expected workloads, that was fun...)
btw, "inheritance spelunking" is an amazing term. Made my day 👍
😂 2
b

badasstronaut

10/07/2022, 4:05 PM
@Toby Rahloff we’re just using the provided version.
Copy code
@flow(task_runner=DaskTaskRunner(cluster_class='dask_cloudprovider.aws.FargateCluster',
                                 cluster_kwargs=dict(image=image,
                                                     fargate_spot=True,
                                                     cluster_arn=cluster_arn,
                                                     execution_role_arn=execution_role_arn,
                                                     task_role_arn=task_role_arn,
                                                     cloudwatch_logs_group=cloudwatch_logs_group,
                                                     vpc=vpc,
                                                     security_groups=security_groups,
                                                     fargate_use_private_ip=True,
                                                     skip_cleanup=True),
                                 # Enable adaptive scaling up to 20 worker containers
                                 adapt_kwargs=dict(minimum=1, maximum=20)))
I think, upon reflection, prefect could give us some powerful options around these problems. The bottleneck is always the scheduler, right? But you don’t want to over-provision. Within a flow, you could monitor the scheduler’s memory. When the memory hits a threshold, shut it down, scale it up, and re-run the same task sets. You could vertically scale the scheduler to right-size per-flow, though it could take a long-ass time…
a

Anna Geller

10/07/2022, 5:42 PM
fwiw I think these days, with capabilities of ECS Fargate being able to run containers with 120 GiB of memory + the super convenient concurrency available out of the box with Prefect, I would seriously evaluate whether scaling out with Dask or Ray is needed -- running things in a distributed way always results in significantly higher complexity
👍 1
you can easily combine
run_deployment
method available in the 2.5 release to trigger runs from deployments, potentially allowing each subflow to run in an individual container with up to 120 GB of memory, all serverless without distributed or Kubernetes clusters
b

badasstronaut

10/07/2022, 5:48 PM
@Anna Geller I asked about that too, but found that this perspective overlooks the convenience/use of Dask abstractions such as Dask DataFrame and Dask Array. If we only wanted the concurrent execution of futures, I would consider eliminating Dask from our infra.
a

Anna Geller

10/07/2022, 5:51 PM
You could leverage an on-demand Dask cluster spun up within a single Fargate container, effectively running dask without distributed and without dask cloud provider -- makes things much easier as you effectively only leverage multiprocessing/multithreading on a single node
this could be: • creating an ECSTask with say 120GB of memory • building a flow with DaskTaskRunner (but without Fargate dask cloud provider) • using everything you love about Dask without managing the complexity of distributed not saying this is a better option, just sth worth considering and playing around with -- might be worth a shot to simplify things
b

badasstronaut

10/07/2022, 5:59 PM
True. If you’re not constrained by 120 GB on your data set. I agree with simplification, and we’re definitely going to have local prefect + dask workloads that we test before deploying. This configuration is specifically for large, long-running workloads with big memory requirements. The start-up time of the containers (sometimes 2+ min each!) is worth the trade-off and complexity. I am impressed at how using Prefect enables us to see what’s going on across heterogenous architecture. Being able to offload tasks to Dask and then know what failed if the scheduler runs out of memory helps a lot. In other words, the ability to manage complexity proves almost as useful as reducing complexity!
👍 3
a

Anna Geller

10/07/2022, 7:11 PM
I know what you mean, I even have this extra recipe to allow running some flows directly in the agent container to reduce the latency of spinning up Fargate containers
even though in 2.0 this latency is reduced, I saw max 60sec latency, not more than that
t

Toby Rahloff

10/10/2022, 9:52 AM
Hi @badasstronaut, I gave our old Prefect+Dask+ECS-Fargate setup another go (with your DaskTaskRunner config above and updated Prefect packages) and ran into a super strange issue we also saw a couple of months ago. Basically, everything works fine at first. The ECS cluster gets created, the scheduler runs, the worker node runs, the Prefect flow runs and even shows up as "succeeded" in the UI but then the FlowRunner/Agent runs into an "IOLoop is closed" error. Did you and your team ever encounter this particular issue?
1
main.py
$ python main.py
a

Anna Geller

10/10/2022, 12:13 PM
This error is a bug from the dask cloud provider when tearing down the cluster after the flow run is complete, we saw a similar issue here -- it's not a Prefect issue, might be worth filing a bug report in the distributed repo. Thanks for flagging that the issue still exists in v2, @Toby Rahloff
t

Toby Rahloff

10/10/2022, 12:16 PM
@Anna Geller Interesting! Are there any workarounds you are aware of?
a

Anna Geller

10/10/2022, 12:53 PM
Only those mentioned in the issue. I definitely hear you, this isn't optimal, but also we are limited in what we can do here as this is an issue with distributed -- I'd recommend opening an issue there to get feedback from their maintainers if they say we can do something on the Prefect side to mitigate the issue, you could open an issue on prefect-dask repo and we could prioritize a fix
t

Toby Rahloff

10/10/2022, 1:23 PM
Will do. I just noticed something odd, will investigate and do a quick write-up. If you run it with minimal DaskCluster config (= dask-cloudprovider creates a new cluster), you will get 4 IOLoop errors. When setting all params that @badasstronaut has in his example code from above in this thread, you only get one IOLoop error (🤯*)*. So, I'm pretty sure a good workaround could be to create IAM role, etc. once and then reference it. The flow would still create a new Dask cluster (w/ isolated scheduler & worker nodes) per flow run. The only downside would be that you need to create some fluff beforehand - totally fine from my perspective. It seems like there is just one more config missing to fix/workaround this (one IOLoop error), but for the love of god, I cannot find the missing config that blocks it. @badasstronaut any idea whether I'm on the right track here? It seems like you folks have played and mastered the "debug esoteric Dask errors" game 😄
😆 1
🙏 1
b

badasstronaut

10/10/2022, 2:51 PM
@Toby Rahloff I am getting the
RuntimeError
too. Seeing as it’s firing when the flow run is complete, and it’s not leaving any leftover infra, I think it’s acceptable for now.
Copy code
...2022-10-05 19:23:44,296 - distributed.deploy.adaptive_core - INFO - Adaptive stop
2022-10-05 19:24:20,328 - distributed.deploy.adaptive_core - INFO - Adaptive stop
19:24:20.328 | INFO    | Flow run 'free-yak' - Finished in state Completed('All states completed.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/weakref.py", line 667, in _exitfunc
    f()
  File "/usr/local/lib/python3.9/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or 
{}
))
  File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
    return sync(
  File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 362, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/weakref.py", line 667, in _exitfunc
    f()
  File "/usr/local/lib/python3.9/weakref.py", line 591, in __call__
    return info.func(*info.args, **(info.kwargs or 
{}
))
  File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
    return sync(
  File "/usr/local/lib/python3.9/site-packages/distributed/utils.py", line 362, in sync
    raise RuntimeError("IOLoop is closed")
RuntimeError: IOLoop is closed
t

Toby Rahloff

10/10/2022, 3:28 PM
Ah, got it. So in your experience, you can just ignore this error even for production use? That's good to know. We have some beefy flow runs coming up, I'll just give it a go and see what happens 😄 Do you know whether my gut feeling about "if I just supply enough config, the error goes away" could be right?
b

badasstronaut

10/10/2022, 3:55 PM
We’re not to “production use” at scale just yet. I think, as with everything in our industry, “it depends” is about the only reasonable answer. I think giving it a go and seeing what happens makes sense because… I don’t have a much better way of addressing these issues and getting these capabilities… a buggy solution is better than no solution, right? So, I’m sure there is a magical config of
dask-cloudprovider
that doesn’t throw IOLoop errors--maybe not with Prefect though. I’m suspicious about several things. Where does
prefect-dask
close the FargateCluster? Note that
prefect-dask
is using
AsyncExitStack
(https://docs.python.org/3/library/contextlib.html#contextlib.AsyncExitStack). From the docs,
The close() method is not implemented, aclose() must be used instead.
It looks like
dask-cloudprovider.FargateCluster
does not implement
aclose
! I want to test this by inheriting from
FargateCluster
, adding an
aclose
method, and calling
cluster._close()
from within
aclose
. I think that might fix this issue? I’ll check back here if I get time to test this today….
🙏 1
c

Chris Gunderson

10/10/2022, 8:11 PM
@George Coyne Great thread! If we are not using ECS Dask Scheduler or ECS Dask Worker. These tasks are coming from ECS, not Prefect, correct?
t

Toby Rahloff

10/11/2022, 6:53 AM
@badasstronaut please remind me that I owe you a beer next time we are at the same conference. Amazing write-up! Now I understand what you meant with "inheritance spelunking" 😂 Reads like a Sherlock Holmes novel
💯 2
🙏 1
c

Chris Gunderson

10/11/2022, 12:48 PM
@badasstronaut I second Toby's comment.
🙏 1
t

Toby Rahloff

10/20/2022, 9:46 AM
Hi @badasstronaut, did you by any chance give this another try?
b

badasstronaut

10/20/2022, 3:38 PM
@Toby Rahloff thanks for the reminder. I had to work on non-Prefect infra over the last week, unfortunately. So I implemented
aclose
on
FargateCluster
:
Copy code
class ClusterAclose(dask_cloudprovider.aws.FargateCluster):
    def __init__(self, *args, **kwargs):
        super.__init__(*args, **kwargs)

    async def aclose(self):
        await super._close()
Still got the issue 🤔. Observation: Dask has a
sync
call defined in a
SyncMethodMixin
that gets used by
Cluster
and
Client
. https://github.com/dask/distributed/blob/main/distributed/utils.py#L313 This is the function throwing the error we see in the logs. So I am wondering: Dask task runner, when it starts, instantiates
Cluster
and
Client
in a context manager (https://github.com/PrefectHQ/prefect-dask/blob/main/prefect_dask/task_runners.py#L275).
Copy code
self._connect_to = self._cluster = await exit_stack.enter_async_context(
                self.cluster_class(asynchronous=True, **self.cluster_kwargs)
            )
            if self.adapt_kwargs:
                self._cluster.adapt(**self.adapt_kwargs)

        self._client = await exit_stack.enter_async_context(
            distributed.Client(
                self._connect_to, asynchronous=True, **self.client_kwargs
            )
        )
I’m wondering: If the run is complete and both of those context managers are closing (e.g. the cluster and the client), and they both call
sync
, if the cluster has been exited then of course the
sync
method from the
Client
instance is going to throw this error, right?
🙌 1
I feel confident enough that this is the right track that I opened a github issue for
prefect-dask
https://github.com/PrefectHQ/prefect-dask/issues/46
🙌 1
z

Zanie

10/20/2022, 4:16 PM
I’ve replied on the issue, but I’ve also reached out to our partners at Dask to discuss a fix.
🙏 5
👍 6
🙌 2
t

Toby Rahloff

10/21/2022, 7:24 AM
Thanks a lot! You are the best 👍
💯 3
👍 3
upvote 4
b

badasstronaut

10/27/2022, 6:10 PM
…I’m stupid impressed with the Prefect team after all this. This is about the best example of open-source/cross-org community collaboration I’ve had the privilege to participate in. Keep up the awesome work!
🙌 4
gratitude thank you 2
❤️ 9
11 Views