Robin
05/05/2021, 8:01 PMresource manager
related question:
I am trying to setup a resource manager to do some larger-than-memory computations within a task.
I wanted to start simple, but even the example gives the following error for me:
multiprocessing\spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Kevin Kho
if __ name__ == __ main__:
Kevin Kho
Robin
05/05/2021, 8:08 PMRobin
05/05/2021, 8:09 PMHey again @Robin! Just a friendly reminder if we can move logs and large code blocks to threads to keep the main channel clean.Sure! Thought code snippet would be OK but agree that it is even cleaner when they are in the thread 👍
Kevin Kho
Kevin Kho
Adam Brusselback
05/05/2021, 8:12 PMRobin
05/05/2021, 8:12 PMKevin Kho
Kevin Kho
Kevin Kho
if __ name__ == __ main__:
. I can’t imagine why but do watch out for it.Robin
05/05/2021, 8:22 PMRobin
05/05/2021, 8:27 PMprefect agent local start
to run it via prefect cloud on the local machine, but getting prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
Sounds like an outdated error message, now that service accounts replace api tokens?
2. How to reconfigure the resource manager to work within a KubernetesRun
?
3. How to setup the resource manager such that the related dask cluster can actually run distributed on many kubernetes pods?Kevin Kho
Robin
05/05/2021, 8:32 PMKevin Kho
Robin
05/05/2021, 8:32 PMRobin
05/05/2021, 8:33 PMpip list
Kevin Kho
Kevin Kho
Robin
05/05/2021, 8:41 PMelif executor == "dask":
# Create Dask Executor
def make_cluster(n_workers, image=None):
"""Start a cluster using the same image as the flow run"""
pod_spec = make_pod_spec(
image=image or prefect.context.image, # default to current active image
memory_limit=memory_limit,
memory_request=memory_request,
cpu_limit=cpu_limit,
cpu_request=cpu_request,
)
return KubeCluster(
pod_spec,
)
flow.executor = DaskExecutor(
cluster_class=make_cluster,
adapt_kwargs={"minimum": 0, "maximum": n_workers},
)
However, in this case I would like to get several pods to work on the computation for a single task, does that make sense?
If I understood the DaskExecutor correctly, the above example enables to run each task within one pod on the dask-kubernetes cluster on one dask worker?
But we have single analyses that require e.g. 200 GB of RAM. Therefore, I would like to run that analysis with dask, to allow analysis on pods of e.g. 4GB memory size. Furthermore, running the analysis distributed on many pods of each 4 GB would be great.
I thought this was possible via resource manager, but am also totally open for other solutions.
The question to what it comes down to:
• How does prefect envision intra-task parallelism?
• (as opposed to inter-task parallelism = task.maps where each task runs on one kubernetes pod)Kevin Kho
Robin
05/05/2021, 8:46 PMRobin
05/05/2021, 8:47 PMconfig.toml
, just containing
[cloud.agent]
auth_token = PV7LJn6u7ersIFQ
which returns the following error when invoking prefect agent local start
toml.decoder.TomlDecodeError: This float doesn't have a leading digit (line 2 column 1 char 14)
I am running on Windows currently, but could also test on Apple M1 🤷Kevin Kho
Robin
05/05/2021, 8:49 PMRobin
05/05/2021, 8:49 PMRobin
05/05/2021, 9:01 PM[cloud.agent]
auth_token = "Naik1NrteU8Q"
Kevin Kho
Robin
05/05/2021, 9:04 PMKevin Kho
Jim Crist-Harif
05/05/2021, 9:46 PMLocalExecutor
) and use Dask stuff within a task. This is common for workflows that make use of dask apis (e.g. dask.dataframe
, ...). In this configuration Prefect is there to orchestrate work done via dask, but any "scaling" is done within a single task. If you're using a temporary dask cluster, this is where you'd use a resource_manager
to create/cleanup the cluster.
• Both of the above. Prefect itself is running on Dask (so your Prefect tasks are running distributed across a cluster) and some prefect tasks also make use of Dask apis directly to farm out work. See https://github.com/PrefectHQ/prefect/discussions/3022 for recommendations on how to do this.Jim Crist-Harif
05/05/2021, 9:48 PMRobin
05/05/2021, 10:23 PMKubernetesRun
config.
I guess the worker_client
could also do the trick, so will test that first thing in the morning 🙂
Thanks for the feedback!Milton Tavares Neto
05/06/2021, 10:29 PM