https://prefect.io logo
Title
m

Michael Hadorn

06/11/2021, 9:06 AM
Hi all I can not use the full cpu with mapped tasks on the prefect server using local dask executor (threads). I've a machine running the prefect server with 8 cpus (on ubuntu). There is a flow executed as a docker run with mapped cpu-intensive tasks using the local dask executor with threads. But whatever I try, the maximum cpu peak is 100% in this flow container (got this from docker stats). I know from other containers running on the same server, that the maximum is 100% per core, so in my case it is 800%. It looks like dask (or the container execution) is limited to only one cpu. Also if I look in 'top', there is only one cpu really in use. Is there any limit on the prefect server? Or is it linux, which is not using different cpu's for threads?
Hm. If I try Dask with Processes, I got:
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
I build my flow on-the-fly, and I see, that this flow is built in every subprocess.
z

Zanie

06/11/2021, 2:20 PM
When using multiprocess dask you'll need to have your flow run in a
if __name__ == '__main__':
block
As to your lack of multiple CPU usage, I'm not sure what's up. That sounds like a Dask/Python thing. Prefect Server does not impose any limits.
m

Michael Hadorn

06/11/2021, 2:28 PM
@Zanie Thanks! Till now I got something to work with the DaskExecutor instead of the LocalDaskExecutor. It also start like i want, but then with a task I got this. As far as I understand, this task have no return or anything. Also for sure not a db connection as return. Do you know there something?
Unexpected error: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function Tasks.join_checks at 0x7fb04870c040>: it's not the same object as cdwhprefect.model.entity_transformation_impl.join_checks.Tasks.join_checks

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state
    task_states[task] = executor.submit(
  File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 393, in submit
    fut = self.client.submit(fn, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1586, in submit
    futures = self._graph_to_futures(
  File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset)
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 955, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(
  File "/usr/local/lib/python3.8/site-packages/dask/highlevelgraph.py", line 392, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)
  File "/usr/local/lib/python3.8/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(d.keys(), map(func, d.values())))
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 3560, in dumps_task
    d["kwargs"] = warn_dumps(task[3])
  File "/usr/local/lib/python3.8/site-packages/distributed/worker.py", line 3572, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
z

Zanie

06/11/2021, 2:46 PM
It looks like what you must be returning that sqlalchemy object somehow
Perhaps it's attached to the Task object?
m

Michael Hadorn

06/11/2021, 2:52 PM
@Zanie Hm not sure if I understood correctly. What means attached to a task? It's just a function with the task-decorator. return of the task is a simple true. Could it be, that the input of this task is problem (this tasks have a lot as upstream), but not because the return of them, only to wait till these other task are processed.
z

Zanie

06/11/2021, 2:55 PM
Attached to the task would only be relevant if you were using a
Task
class so nevermind.
It could be attempting to serialize the output from an upstream yeah, I'm not sure. This is a weird error that generally you shouldn't see with dask.
I'd make sure none of your upstreams are returning sqlalchemy objects either
It's also possible it's just sitting around as a global variable that dask is trying to package?
m

Michael Hadorn

06/11/2021, 3:04 PM
Hm. If I also reduce my flow to this only single task (it's a really easy one) the problem appears. So all upstream (only some parameters) are not here anymore
z

Zanie

06/11/2021, 3:08 PM
Can you share your code for this single-task fow?
m

Michael Hadorn

06/11/2021, 3:14 PM
Trying to build a standalone flow for this problem.
I found the error. Like the error messasge says, there was a sqlalchemy object not closed, but it should be. Thanks a lot for your hints! Now it's working
Ah, for the record: If we use only threads, cpu work is still done on 1 cpu. For using more than one, i changed from local dask executor to the dask executor. Then it works.