Hello everyone,
I'm facing an issue using Prefect with a Dask task runner, although the issue may be more linked to how Prefect works and how I use it.
See the minimal example of what I would like to do in the thread. I created the "AsCompleted" class based on
<http://prefect.futures.as|prefect.futures.as>_completed
in order to be able to add new futures at any time (it works great as far as I can tell).
This code works with the `ThreadPoolTaskRunner`:
Starting task with argument: 3
Starting task with argument: 2
Starting task with argument: 1
10:21:51.238 | INFO | Task run 'run-d0d' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274aa0> finished with 1
10:21:52.237 | INFO | Task run 'run-f40' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274a10> finished with 2
10:21:53.237 | INFO | Task run 'run-9cb' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17274980> finished with 3
Launching followup
Starting task with argument: 2
10:21:55.305 | INFO | Task run 'run-d92' - Finished in state Completed()
Task <__main__.TaskObj object at 0xe5f17d7dd90> finished with 2
10:21:55.509 | INFO | Flow run 'elastic-toucan' - Finished in state Completed()
The issue is with the
DaskTaskRunner
.
dask
needs to pickle the flow, and will thus pickle the
MainObj
instance, while it is not picklable and nor would it be reasonable to pickle, which I represented with the
large_object
.
dask
tries
pickle
and
cloudpickle
, but both fail with
TypeError: cannot pickle '_thread.lock' object
,
_pickle.PicklingError: Can't pickle <function TaskObj.run at 0x1103f8869760>: it's not the same object as __main__.TaskObj.run
and
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x1103f81a4260>\n 0. 18708634110400\n>')
.
I tried to move the flow out of the object, but I always have something unpickable in the flow which causes a similar error. Is there any way to handle this kind of problem? Any help would be a lifesaver! Thanks a lot in advance!
Some things that I can't change:
- The DaskTaskRunner is necessary, there isn't any other supported task runner which can replace it (interfacing with SLURM and limiting tasks based on custom resources).
-
process_result
needs to have access to the
MainObj
instance, since it updates its fields based on the task result.
- Tasks need to be immediately launched based on the result of completing tasks, hence the necessity for
AsCompleted
.