Raphaël Robidas
10/23/2024, 2:38 PM<http://prefect.futures.as|prefect.futures.as>_completed
in order to be able to add new futures at any time (it works great as far as I can tell).
This code works with the `ThreadPoolTaskRunner`:
Starting task with argument: 3
Starting task with argument: 2
Starting task with argument: 1
10:21:51.238 | INFO | Task run 'run-d0d' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274aa0> finished with 1
10:21:52.237 | INFO | Task run 'run-f40' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274a10> finished with 2
10:21:53.237 | INFO | Task run 'run-9cb' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274980> finished with 3
Launching followup
Starting task with argument: 2
10:21:55.305 | INFO | Task run 'run-d92' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17d7dd90> finished with 2
10:21:55.509 | INFO | Flow run 'elastic-toucan' - Finished in state Completed()
The issue is with the DaskTaskRunner
. dask
needs to pickle the flow, and will thus pickle the MainObj
instance, while it is not picklable and nor would it be reasonable to pickle, which I represented with the large_object
. dask
tries pickle
and cloudpickle
, but both fail with TypeError: cannot pickle '_thread.lock' object
, _pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
and TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
.
I tried to move the flow out of the object, but I always have something unpickable in the flow which causes a similar error. Is there any way to handle this kind of problem? Any help would be a lifesaver! Thanks a lot in advance!
Some things that I can't change:
- The DaskTaskRunner is necessary, there isn't any other supported task runner which can replace it (interfacing with SLURM and limiting tasks based on custom resources).
- process_result
needs to have access to the MainObj
instance, since it updates its fields based on the task result.
- Tasks need to be immediately launched based on the result of completing tasks, hence the necessity for AsCompleted
.Raphaël Robidas
10/23/2024, 2:39 PM10:24:02.592 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
10:24:02.839 | INFO | distributed.scheduler - State start
10:24:02.842 | INFO | distributed.scheduler - Scheduler at: <tcp://127.0.0.1:46117>
...
10:24:04.462 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:45409/status>
10:24:05.150 | ERROR | distributed.protocol.pickle - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>
0. 18708634110400
>.
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
10:24:05.152 | ERROR | Flow run 'piquant-jaguarundi' - Encountered exception during execution: TypeError('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
yield self
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
engine.call_flow_fn()
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/raphael/tmp/prefect/min_desired.py", line 100, in run
futures = [task.run.submit() for task in tasks]
^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/tasks.py", line 1163, in submit
future = task_runner.submit(self, parameters, wait_for)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
future = self._client.submit(
^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/client.py", line 64, in submit
future = super().submit(
^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 2165, in submit
futures = self._graph_to_futures(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 3355, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 392, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
10:24:05.284 | INFO | distributed.scheduler - Remove client PrefectDaskClient-74c6ce25-914a-11ef-840e-2cf05dd99b37
10:24:05.285 | INFO | distributed.core - Received 'close-stream' from <tcp://127.0.0.1:51090>; closing.
...
10:24:05.750 | ERROR | Flow run 'piquant-jaguarundi' - Finished in state Failed("Flow run encountered an exception: TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\\n 0. 18708634110400\\n>')")
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/raphael/<...>/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/raphael/tmp/prefect/min_desired.py", line 118, in <module>
obj.run()
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flows.py", line 1345, in __call__
return run_flow(
^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 821, in run_flow
return run_flow_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 701, in run_flow_sync
return engine.state if return_type == "state" else engine.result()
^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 255, in result
raise self._raised
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 655, in run_context
yield self
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 699, in run_flow_sync
engine.call_flow_fn()
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/flow_engine.py", line 678, in call_flow_fn
result = call_with_parameters(self.flow.fn, self.parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/home/raphael/tmp/prefect/min_desired.py", line 100, in run
futures = [task.run.submit() for task in tasks]
^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect/tasks.py", line 1163, in submit
future = task_runner.submit(self, parameters, wait_for)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/task_runners.py", line 353, in submit
future = self._client.submit(
^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/prefect_dask/client.py", line 64, in submit
future = super().submit(
^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 2165, in submit
futures = self._graph_to_futures(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/client.py", line 3355, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/raphael/<...>/lib/python3.12/site-packages/distributed/protocol/serialize.py", line 392, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
Raphaël Robidas
10/23/2024, 2:45 PMimport time
import numpy as np
import threading
from typing import List, Optional, Set
from typing_extensions import TypeVar
from prefect.futures import PrefectFuture
from prefect.utilities.timeout import timeout as timeout_context
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner
F = TypeVar("F")
R = TypeVar("R")
class AsCompleted:
def __init__(
self, futures: List[PrefectFuture[R]], timeout: Optional[float] = None
):
self.unique_futures: Set[PrefectFuture[R]] = set(futures)
self.total_futures = len(self.unique_futures)
self.timeout = timeout
self.done = {}
self.pending = {}
self.finished_futures = []
def add_to_done(self, future):
with self.finished_lock:
self.finished_futures.append(future)
self.finished_event.set()
def __iter__(self):
try:
with timeout_context(self.timeout):
self.done = {f for f in self.unique_futures if f._final_state}
self.pending = self.unique_futures - self.done
yield from self.done
self.finished_event = threading.Event()
self.finished_lock = threading.Lock()
for future in self.pending:
future.add_done_callback(self.add_to_done)
while self.pending:
self.finished_event.wait()
with self.finished_lock:
self.done = self.finished_futures
self.finished_futures = []
self.finished_event.clear()
for future in self.done:
self.pending.remove(future)
yield future
except TimeoutError:
raise TimeoutError(
"%d (of %d) futures unfinished"
% (len(self.pending), self.total_futures)
)
def add(self, fut):
with self.finished_lock:
self.unique_futures.add(fut)
self.pending.add(fut)
self.total_futures += 1
fut.add_done_callback(self.add_to_done)
def count(self):
return len(self.unique_futures)
class TaskObj:
def __init__(self, arg):
self.arg = arg
self.result = None
@task
def run(self):
print(f"Starting task with argument: {self.arg}")
time.sleep(self.arg)
self.result = self.arg
return self
class MainObj:
def __init__(self):
self.large_object = np.random.random((10000, 10000))
self.lock = threading.Lock()
#@flow(task_runner=DaskTaskRunner())
@flow(task_runner=ThreadPoolTaskRunner)
def run(self):
tasks = [TaskObj(1), TaskObj(2), TaskObj(3)]
futures = [task.run.submit() for task in tasks]
self.asc = AsCompleted(futures)
for fut in self.asc:
res = fut.result()
self.process_result(res)
def process_result(self, task):
print(f"Task {task} finished with {task.result}")
if task.result > 2:
print("Launching followup")
followup_job = TaskObj(task.result - 1)
self.asc.add(followup_job.run.submit())
if __name__ == "__main__":
obj = MainObj()
obj.run()
Also, similar unresolved issue: https://github.com/PrefectHQ/prefect/issues/12083Bianca Hoch
10/23/2024, 6:47 PMRaphaël Robidas
10/23/2024, 6:53 PMNate
10/24/2024, 6:02 PMNate
10/24/2024, 6:04 PMTaskObj
- this seems potentially related to your pickling woes?Raphaël Robidas
10/24/2024, 7:15 PMTaskObj
, in my real codebase, I have a bunch of job classes which inherit from an abstract base class which resembles TaskObj
. They are used to keep track of the state of the results, as there are several subtasks, and also to make handling the individual job logic easier. I made certain that these objects can be pickled fine (and indeed had to work a bit to reach that point). So the error Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
may be related to the apparent class path at pickling time, I could not find a way to bypass it.Nate
10/24/2024, 7:19 PMYou mean that you suggest an even smaller minimal example?ideally, yes! that would help us focus on which part of prefect's own implementation is related to the trouble you're seeing, and which parts might be idiosyncratic detail at a glance, it appears that the pickling issues have to do with your use of
Lock
Nate
10/24/2024, 7:20 PMNate
10/24/2024, 7:30 PM# /// script
# dependencies = [
# "prefect-dask",
# ]
# ///
import threading
from prefect_dask import DaskTaskRunner
from prefect import flow, task
# Minimal reproduction of unpicklable state
lock = threading.Lock()
@task
def my_task():
# Use lock to demonstrate pickle error
with lock:
return "hello"
@flow(task_runner=DaskTaskRunner())
def my_flow():
return my_task.submit()
if __name__ == "__main__":
my_flow()
» uv run sandbox/repros/pickling_lock.py
yields the same error
TypeError: cannot pickle '_thread.lock' object
Nate
10/24/2024, 7:35 PMRaphaël Robidas
10/24/2024, 7:35 PMMainObj
is pickled. It shouldn't be pickled even if it can be, since it's large and not relevant to the task.
Here is a smaller example:
import time
import numpy as np
import threading
from prefect import flow, task
from prefect.futures import as_completed
from prefect_dask import DaskTaskRunner
from prefect.task_runners import ThreadPoolTaskRunner
class TaskObj:
def __init__(self, arg):
self.arg = arg
self.result = None
@task
def run(self):
print(f"Starting task with argument: {self.arg}")
time.sleep(self.arg)
self.result = self.arg
return self
class MainObj:
def __init__(self):
# This object should not be pickled
self.large_object = np.random.random((10000, 10000))
self.lock = threading.Lock()
#@flow(task_runner=ThreadPoolTaskRunner)
@flow(task_runner=DaskTaskRunner())
def run(self):
tasks = [TaskObj(1), TaskObj(2), TaskObj(3)]
futures = [task.run.submit() for task in tasks]
for fut in as_completed(futures):
res = fut.result()
self.process_result(res)
def process_result(self, task):
print(f"Task {task} finished with {task.result}")
if task.result > 2:
# Would normally update `MainObj` with the result
pass
if __name__ == "__main__":
obj = MainObj()
obj.run()
Raphaël Robidas
10/24/2024, 7:37 PMMainObj
is pickled here, which is problematic. If run
calls a flow outside of the class, then the flow cannot modify the object fields, which is pretty much necessaryNate
10/24/2024, 7:38 PMRaphaël Robidas
10/24/2024, 7:39 PMTaskObj.run
be the flow, but apparently you can't queue a bunch of flows like you can with tasks.Raphaël Robidas
10/24/2024, 7:44 PMMainObj
is being pickled because the Flow object refers to it (I ran tests to identify exactly what wasn't picklable and where it came from). Is it either possible to refer to the flow in a "shallow" way (i.e., without the containing class) or run tasks outside of a flow? For my use, tasks and subtasks is as good as flows and tasks.Raphaël Robidas
10/24/2024, 7:45 PMtask.delay
, but then the task runner isn't being set to the Dask one.Nate
10/24/2024, 7:47 PMRaphaël Robidas
10/24/2024, 7:47 PMNate
10/24/2024, 7:47 PMRaphaël Robidas
10/24/2024, 8:10 PMRaphaël Robidas
10/24/2024, 9:26 PMRaphaël Robidas
10/30/2024, 3:49 PMNate
10/30/2024, 4:50 PM