Hey im trying to run a multiprocessor inside a pre...
# prefect-community
a
Hey im trying to run a multiprocessor inside a prefect task function that makes a call to a function (not defined as task) outside the task. It runs fine on my local machine but gives PicklingError when running on local agent (refer image)
🧐 1
d
Hey @Anish Chhaparwal! Are you using an executor when you’re running with your agent?
a
nope.
d
What’s in
query_result_list
?
Also, you’d define an executor in your
Flow
definition, something like:
Copy code
with Flow(name="My Flow", environment=LocalEnvironment(executor=DaskExecutor()) as flow:
  # flow things
Executors aren’t used when calling
flow.run()
by default, but are used when a flow run is started by an agent if you’ve configured your flow as above ☝️
a
hey im a little confused. I did not run the flow from the agent. I registered it and then ran using the UI. I have other flows running on the same agent that work fine.
query_result_list gets a list of documents from database in the form of list. It has a few million record so i'd like to use multiprocessor
My suggestion is to make
dcmpath_check
a task and use Prefect’s built-in mappping
You can then achieve multiprocessing with the DaskExecutor: https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html#the-dask-executor
That way, your whole flow is executed using multiprocessing
I’m not sure why you’re experiencing this specific error, but I believe that the above suggestion should serve
Note that you don’t need to pass an address to the executor, if you omit it one will be spun up for you
a
the reason i didnt use the map was because i'd have to fire a dask agent everytime. (please correct me if im wrong) and if i dont pass an address how many workers are spun by default?
d
That’s configurable, I believe you can pass
n_proc
or
n_thread
depending on how you’re running your scheduler
I’m not sure what the default is
a
okay ill give this a shot and get back to you. Meanwhile if you can find out what i did wrong in the first place while using pythons multiprocessor please let me know.
d
I’m not sure if you did anything wrong! Haha
Looking at your code it feels like this should work?
Functions can be pickled
a
@Dylan Just ran the program using map task along with dask executor. It is significantly slower than just using the python multiprocessor when using the same number of cores. Can you look into the original issue again? i dont understand why anything has to be pickled. i did another trial defining the dcmpath_check function (function named as dc when running the below trial) within the query_result_list function itself. I get the error shown in image.
d
@Anish Chhaparwal can you share a reproducible example?
Prefect is set up for use in distributed environments
So, anything that needs to be able to be passed between many machines needs to be serializable
We use cloudpickle for this purpose
a
okay.
d
I’m also confused by your example, though, because the multiprocessing you’re attempting shouldn’t leave a single task
So in theory it should work
But if a task doesn’t have access to that global function, it might be problematic
Actually, you could try defining that function within the task method?
a
I did exactly that and still getting an error. please refer my previous comment (before okay)
d
Can you share a reproducible example as a code snippet?
I don’t have a complete look at your code from the pictures so far and I’d like something I can copy/paste 😄
Also, here’s some background that may be helpful: https://docs.prefect.io/orchestration/faq/dataflow.html
tl;dr multiprocessing uses the standard library version of pickle which doesn’t work well with functions defined in 
__main__
 (this is one of the issues cloudpickle solves for)
a
cool. Thanks for citing the issue link. Will import the function as part of lib and try again.
shall keep you posted!
hey it worked when i imported the function. thanks :)
d
Awesome! Glad to hear it =]