Hello everyone, I'm facing an issue using Prefect...
# ask-community
r
Hello everyone, I'm facing an issue using Prefect with a Dask task runner, although the issue may be more linked to how Prefect works and how I use it. See the minimal example of what I would like to do in the thread. I created the "AsCompleted" class based on
<http://prefect.futures.as|prefect.futures.as>_completed
in order to be able to add new futures at any time (it works great as far as I can tell). This code works with the `ThreadPoolTaskRunner`:
Copy code
Starting task with argument: 3
Starting task with argument: 2
Starting task with argument: 1
10:21:51.238 | INFO    | Task run 'run-d0d' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274aa0> finished with 1
10:21:52.237 | INFO    | Task run 'run-f40' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274a10> finished with 2
10:21:53.237 | INFO    | Task run 'run-9cb' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274980> finished with 3
Launching followup
Starting task with argument: 2
10:21:55.305 | INFO    | Task run 'run-d92' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17d7dd90> finished with 2
10:21:55.509 | INFO    | Flow run 'elastic-toucan' - Finished in state Completed()
The issue is with the
DaskTaskRunner
.
dask
needs to pickle the flow, and will thus pickle the
MainObj
instance, while it is not picklable and nor would it be reasonable to pickle, which I represented with the
large_object
.
dask
tries
pickle
and
cloudpickle
, but both fail with
TypeError: cannot pickle '_thread.lock' object
,
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
and
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
. I tried to move the flow out of the object, but I always have something unpickable in the flow which causes a similar error. Is there any way to handle this kind of problem? Any help would be a lifesaver! Thanks a lot in advance! Some things that I can't change: - The DaskTaskRunner is necessary, there isn't any other supported task runner which can replace it (interfacing with SLURM and limiting tasks based on custom resources). -
process_result
needs to have access to the
MainObj
instance, since it updates its fields based on the task result. - Tasks need to be immediately launched based on the result of completing tasks, hence the necessity for
AsCompleted
.
Here is the log (with the distributed.* parts shortened for brevity):
Copy code
10:24:02.592 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
10:24:02.839 | INFO    | distributed.scheduler - State start
10:24:02.842 | INFO    | distributed.scheduler -   Scheduler at:     <tcp://127.0.0.1:46117>
...
10:24:04.462 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:45409/status>
10:24:05.150 | ERROR   | distributed.protocol.pickle - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>
 0. 18708634110400
>.
Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
10:24:05.152 | ERROR   | Flow run 'piquant-jaguarundi' - Encountered exception during execution: TypeError('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/tmp/prefect/min_desired.py", line 100, in run
    futures = [task.run.submit() for task in tasks]
               ^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/tasks.py", line 1163, in submit
    future = task_runner.submit(self, parameters, wait_for)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
    future = self._client.submit(
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/client.py", line 64, in submit
    future = super().submit(
             ^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 2165, in submit
    futures = self._graph_to_futures(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 3355, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 392, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
10:24:05.284 | INFO    | distributed.scheduler - Remove client PrefectDaskClient-74c6ce25-914a-11ef-840e-2cf05dd99b37
10:24:05.285 | INFO    | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:51090>; closing.
...
10:24:05.750 | ERROR   | Flow run 'piquant-jaguarundi' - Finished in state Failed("Flow run encountered an exception: TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\\n 0. 18708634110400\\n>')")
Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/raphael/tmp/prefect/min_desired.py", line 118, in <module>
    obj.run()
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flows.py", line 1345, in __call__
    return run_flow(
           ^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 821, in run_flow
    return run_flow_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 255, in result
    raise self._raised
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
    yield self
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
    engine.call_flow_fn()
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/tmp/prefect/min_desired.py", line 100, in run
    futures = [task.run.submit() for task in tasks]
               ^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/tasks.py", line 1163, in submit
    future = task_runner.submit(self, parameters, wait_for)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
    future = self._client.submit(
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/client.py", line 64, in submit
    future = super().submit(
             ^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 2165, in submit
    futures = self._graph_to_futures(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 3355, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 392, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
Here is the minimal example of what I want to do which I refer to:
Copy code
import time
import numpy as np
import threading

from typing import List, Optional, Set
from typing_extensions import TypeVar

from prefect.futures import PrefectFuture
from prefect.utilities.timeout import timeout as timeout_context
from prefect import flow, task

from prefect_dask import DaskTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner

F = TypeVar("F")
R = TypeVar("R")


class AsCompleted:
    def __init__(
        self, futures: List[PrefectFuture[R]], timeout: Optional[float] = None
    ):
        self.unique_futures: Set[PrefectFuture[R]] = set(futures)
        self.total_futures = len(self.unique_futures)
        self.timeout = timeout

        self.done = {}
        self.pending = {}
        self.finished_futures = []

    def add_to_done(self, future):
        with self.finished_lock:
            self.finished_futures.append(future)
            self.finished_event.set()

    def __iter__(self):
        try:
            with timeout_context(self.timeout):
                self.done = {f for f in self.unique_futures if f._final_state}
                self.pending = self.unique_futures - self.done

                yield from self.done

                self.finished_event = threading.Event()
                self.finished_lock = threading.Lock()

                for future in self.pending:
                    future.add_done_callback(self.add_to_done)

                while self.pending:
                    self.finished_event.wait()
                    with self.finished_lock:
                        self.done = self.finished_futures
                        self.finished_futures = []
                        self.finished_event.clear()

                    for future in self.done:
                        self.pending.remove(future)
                        yield future

        except TimeoutError:
            raise TimeoutError(
                "%d (of %d) futures unfinished"
                % (len(self.pending), self.total_futures)
            )

    def add(self, fut):
        with self.finished_lock:
            self.unique_futures.add(fut)
            self.pending.add(fut)
            self.total_futures += 1
            fut.add_done_callback(self.add_to_done)

    def count(self):
        return len(self.unique_futures)


class TaskObj:
    def __init__(self, arg):
        self.arg = arg
        self.result = None

    @task
    def run(self):
        print(f"Starting task with argument: {self.arg}")
        time.sleep(self.arg)
        self.result = self.arg
        return self


class MainObj:
    def __init__(self):
        self.large_object = np.random.random((10000, 10000))
        self.lock = threading.Lock()

    #@flow(task_runner=DaskTaskRunner())
    @flow(task_runner=ThreadPoolTaskRunner)
    def run(self):
        tasks = [TaskObj(1), TaskObj(2), TaskObj(3)]
        futures = [task.run.submit() for task in tasks]

        self.asc = AsCompleted(futures)

        for fut in self.asc:
            res = fut.result()
            self.process_result(res)

    def process_result(self, task):
        print(f"Task {task} finished with {task.result}")
        if task.result > 2:
            print("Launching followup")
            followup_job = TaskObj(task.result - 1)
            self.asc.add(followup_job.run.submit())


if __name__ == "__main__":
    obj = MainObj()
    obj.run()
Also, similar unresolved issue: https://github.com/PrefectHQ/prefect/issues/12083
b
Hey Raphael, thanks for your post! Can you move your script into the thread as well? It's a pretty big wall of text to have in the main channel. Please & ty 🙇
r
Hello Bianca, thanks for the reply! I moved the script to my second reply 🙂
1
gratitude thank you 1
n
hi @Raphaël Robidas - this would be a good candidate for a discussion! ideally a minimal version of what you're doing above and why its necessary would be helpful
blob attention gif 1
upvote 1
in particular, I dont understand the need for the extra abstraction
TaskObj
- this seems potentially related to your pickling woes?
r
Hello @Nate, thanks for the reply! You mean that you suggest an even smaller minimal example? As for
TaskObj
, in my real codebase, I have a bunch of job classes which inherit from an abstract base class which resembles
TaskObj
. They are used to keep track of the state of the results, as there are several subtasks, and also to make handling the individual job logic easier. I made certain that these objects can be pickled fine (and indeed had to work a bit to reach that point). So the error
Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
may be related to the apparent class path at pickling time, I could not find a way to bypass it.
n
You mean that you suggest an even smaller minimal example?
ideally, yes! that would help us focus on which part of prefect's own implementation is related to the trouble you're seeing, and which parts might be idiosyncratic detail at a glance, it appears that the pickling issues have to do with your use of
Lock
feel free to correct me if you think im missing something
for example
Copy code
# /// script
# dependencies = [
#     "prefect-dask",
# ]
# ///


import threading

from prefect_dask import DaskTaskRunner

from prefect import flow, task

# Minimal reproduction of unpicklable state
lock = threading.Lock()


@task
def my_task():
    # Use lock to demonstrate pickle error
    with lock:
        return "hello"


@flow(task_runner=DaskTaskRunner())
def my_flow():
    return my_task.submit()


if __name__ == "__main__":
    my_flow()
Copy code
» uv run sandbox/repros/pickling_lock.py
yields the same error
Copy code
TypeError: cannot pickle '_thread.lock' object
Im not sure I would expect to be able to pickle a lock in the first place, which because you're using dask, you have to do
r
Yes, I added the lock object to make the code crash if
MainObj
is pickled. It shouldn't be pickled even if it can be, since it's large and not relevant to the task. Here is a smaller example:
Copy code
import time
import numpy as np
import threading

from prefect import flow, task
from prefect.futures import as_completed
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner


class TaskObj:
    def __init__(self, arg):
        self.arg = arg
        self.result = None

    @task
    def run(self):
        print(f"Starting task with argument: {self.arg}")
        time.sleep(self.arg)
        self.result = self.arg
        return self


class MainObj:
    def __init__(self):
        # This object should not be pickled
        self.large_object = np.random.random((10000, 10000))
        self.lock = threading.Lock()

    #@flow(task_runner=ThreadPoolTaskRunner)
    @flow(task_runner=DaskTaskRunner())
    def run(self):
        tasks = [TaskObj(1), TaskObj(2), TaskObj(3)]
        futures = [task.run.submit() for task in tasks]

        for fut in as_completed(futures):
            res = fut.result()
            self.process_result(res)

    def process_result(self, task):
        print(f"Task {task} finished with {task.result}")
        if task.result > 2:
            # Would normally update `MainObj` with the result
            pass


if __name__ == "__main__":
    obj = MainObj()
    obj.run()
With the Dask running,
MainObj
is pickled here, which is problematic. If
run
calls a flow outside of the class, then the flow cannot modify the object fields, which is pretty much necessary
n
I would recommend making your task a pure function instead of an instance method, so you dont have to worry about pickling the state on your class
r
Also, I initially wanted to have
TaskObj.run
be the flow, but apparently you can't queue a bunch of flows like you can with tasks.
If the task function only is a pure function, it still crashes with the same error.
MainObj
is being pickled because the Flow object refers to it (I ran tests to identify exactly what wasn't picklable and where it came from). Is it either possible to refer to the flow in a "shallow" way (i.e., without the containing class) or run tasks outside of a flow? For my use, tasks and subtasks is as good as flows and tasks.
👍 1
I saw that it's possible to do
task.delay
, but then the task runner isn't being set to the Dask one.
n
sorry, i can spend some time looking at this in a bit are you willing to open a discussion that summarizes the issue here? that way others can discover and benefit from the discussion
r
Alright, sure thing, thanks!
n
thank you!
r
Hold on, are flows actually run on the task runners? I was under the impression that flows run in the calling process and only tasks get dispatched to task runners. However, in debugging, it seems that the flow function gets sent to Dask, which is the source of the issue.
Hello @Nate, Any preliminary thoughts on this? Am I doing something obviously wrong or it will require deeper investigation? Thanks a lot!
n
hi @Raphaël Robidas - i responded with a working example that should be a useful reference
🙏 1
168 Views