Hey prefect people, a `resource manager` related ...
# ask-community
r
Hey prefect people, a
resource manager
related question: I am trying to setup a resource manager to do some larger-than-memory computations within a task. I wanted to start simple, but even the example gives the following error for me:
Copy code
multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
k
Hey again @Robin! Just a friendly reminder if we can move logs and large code blocks to threads to keep the main channel clean. I think this thread might be helpful. The TLDR is to try running the flow inside
if __ name__ == __ main__:
Man you must be doing a lot of interested Prefect-related work today!
r
Haha yeah 🙂 Finally got the time to deep dive 🙏
Hey again @Robin! Just a friendly reminder if we can move logs and large code blocks to threads to keep the main channel clean.
Sure! Thought code snippet would be OK but agree that it is even cleaner when they are in the thread 👍
k
Either the log or the code works. Just not both in the main channel I think. 😅
🙏 1
I do appreciate the level of detail though.
a
(I wish Slack made it more apparent to use threads rather than top posting when that's the preference of the people in the channel)
👍 2
r
Thanks, it works! I just edited the code snippet with the respective lines. I suggest to update the linked documentation accordingly, saving everyone some trouble 🙂
k
This is not a specific issue of resource managers. This is related to the OS and Dask unfortunately I think….
The other thread had the same issue with a DaskExecutor without the Resource manager. There is a Github issue link in that thread.
Whoops I just saw the previous lost parallelism in his Flow by doing the
if __ name__ == __ main__:
. I can’t imagine why but do watch out for it.
Ignore this I think you’re good
r
OK 🙂
I am now stuck on running it on prefect cloud: 1. Trying to
prefect agent local start
to run it via prefect cloud on the local machine, but getting
prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
Sounds like an outdated error message, now that service accounts replace api tokens? 2. How to reconfigure the resource manager to work within a
KubernetesRun
? 3. How to setup the resource manager such that the related dask cluster can actually run distributed on many kubernetes pods?
k
What version are you on?
r
prefect?
k
Yep
r
0.14.17
pip list
k
I will look into 1 for you. I don’t see people using resource managers with Kubernetes, they create the cluster and pass it to the DaskExecutor like this .
Did you configure the auth this way
r
OK we do it similar normally:
Copy code
elif executor == "dask":
        # Create Dask Executor

        def make_cluster(n_workers, image=None):
            """Start a cluster using the same image as the flow run"""

            pod_spec = make_pod_spec(
                image=image or prefect.context.image,  # default to current active image
                memory_limit=memory_limit,
                memory_request=memory_request,
                cpu_limit=cpu_limit,
                cpu_request=cpu_request,
            )
            return KubeCluster(
                pod_spec,
            )

        flow.executor = DaskExecutor(
            cluster_class=make_cluster,
            adapt_kwargs={"minimum": 0, "maximum": n_workers},
        )
However, in this case I would like to get several pods to work on the computation for a single task, does that make sense? If I understood the DaskExecutor correctly, the above example enables to run each task within one pod on the dask-kubernetes cluster on one dask worker? But we have single analyses that require e.g. 200 GB of RAM. Therefore, I would like to run that analysis with dask, to allow analysis on pods of e.g. 4GB memory size. Furthermore, running the analysis distributed on many pods of each 4 GB would be great. I thought this was possible via resource manager, but am also totally open for other solutions. The question to what it comes down to: • How does prefect envision intra-task parallelism? • (as opposed to inter-task parallelism = task.maps where each task runs on one kubernetes pod)
k
Gotcha, will ask the team about this.
r
OK, thanks a lot!
I just created the
config.toml
, just containing
Copy code
[cloud.agent]
auth_token = PV7LJn6u7ersIFQ
which returns the following error when invoking
prefect agent local start
Copy code
toml.decoder.TomlDecodeError: This float doesn't have a leading digit (line 2 column 1 char 14)
I am running on Windows currently, but could also test on Apple M1 🤷
k
I think you should hide the token (unless that fake). It looks right to me.
r
Haha, yeah, will destroy
It's getting late here 😄
LOL, quotes made it work for me (token is random this time):
Copy code
[cloud.agent]
auth_token = "Naik1NrteU8Q"
k
Oh that’s good to know! I may have to get back to you on Dask and the pods tomorrow. Will message you when I have an answer.
r
That would be great! Thanks for caring 🙏
k
Of course
j
Hi Robin, I'm not sure I fully understand your dask question. It looks like you're trying to use prefect to orchestrate some tasks, and each task might also do some dask stuff internally? In general, there's 3 ways users might mix prefect and Dask: • Run Prefect distributed on Dask (using e.g. a DaskExecutor). In this case the unit of parallelization is a prefect task, where each task runs in a thread on a dask worker located somewhere. The user doesn't need to know they're running on dask, as they never touch dask apis directly. • Run Prefect locally (using e.g.
LocalExecutor
) and use Dask stuff within a task. This is common for workflows that make use of dask apis (e.g.
dask.dataframe
, ...). In this configuration Prefect is there to orchestrate work done via dask, but any "scaling" is done within a single task. If you're using a temporary dask cluster, this is where you'd use a
resource_manager
to create/cleanup the cluster. • Both of the above. Prefect itself is running on Dask (so your Prefect tasks are running distributed across a cluster) and some prefect tasks also make use of Dask apis directly to farm out work. See https://github.com/PrefectHQ/prefect/discussions/3022 for recommendations on how to do this.
Given the options above, I recommend using option 1 (run prefect on dask) or option 2 (run dask within prefect). Option 3 may work, but the mixed levels of parallelism can also cause hard to debug issues.
r
Thanks @Jim Crist-Harif! Our general question would be: "How does prefect envision running analyses with great memory requirements, potentially for thousands of systems?" We love the dask distributed capabilities for big data analyses and are wondering how to best leverage it with prefect. Therefore, the third option and the github discussion indeed address exactly that question 👍 Ideally, I would imagine prefect to allow both highly parallelized flows with tasks that each run on a dask-root pod of which some tasks might be computationally heavy and thus can make use of several dask-root pods per task. Hence something like the resource manager working within a flow with
KubernetesRun
config. I guess the
worker_client
could also do the trick, so will test that first thing in the morning 🙂 Thanks for the feedback!
m
Hey everyone. I just wanted to thank everyone for this discussions. We were just wondering what was the best way to do it too. We will currently go with the localexcutor + resource manager approach. But it I agree woth robin that it would be nice to have both parallelisms. Can you elaborate a bit on why using worker_clients could cause bugs? Would it be possible to setup a local dask cluster to run prefect tasks and a remote one to run dask collections? We've tried to do it but it seems that we can only use onde dask client for both at the same time, right? If we're able to use these two different clusters, would it help with prefect and dask parallelism problems? Thanks
upvote 1
🙏 1