Hi everyone, I am trying to run tasks in parallel ...
# ask-community
v
Hi everyone, I am trying to run tasks in parallel (multiple tasks at same time). I get the following error:
Copy code
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\'
k
This seems like you might have an import with
RandomNum
?
v
yes
k
What agent are you using?
v
local
k
I think if you spin up the agent in a directory that can resolve the
RandomNum
import, this should be successful
v
ok let me try this
I tried to spin up the agent in a directory using the -p argument
i still get the same error
Copy code
FlowStorageError('An error occurred while unpickling the flow:\n  AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
@Kevin Kho any idea how i can get this thing to work or any other alternatives?
k
Could you show me how you are importing that?
v
Copy code
import logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local


class RandomNum(Task):
    def run(self, stop):
        number = randrange(stop)
        print(f"Your number is {number}")
        <http://logging.info|logging.info>(f"Your number is {number}")
        return number


class Sum(Task):
    def run(self, numbers):
        print(sum(numbers))
        <http://logging.info|logging.info>(sum(numbers))


flow = Flow("parallel-tasks")

stop = Parameter("stop")

number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()

stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)

sum_numbers = Sum()

sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)
flow.storage = Local()
flow.register(project_name='Parallel_Demo')

if __name__ == '__main__':
    flow.run(parameters={"stop": 5}, executor=DaskExecutor())
this is my .py file
which has my flow
when i used localDask executor it runs my tasks serially
and i can trigger it from cloud
but the dask executor gives me that error
k
Could you try the functional API?
Copy code
with Flow() as flow:
     stop = Parameter("stop")

     number_1 = RandomNum(upstream_tasks=[stop])()
     number_2 = RandomNum(upstream_tasks=[stop])()
     number_3 = RandomNum(upstream_tasks=[stop])()

     sum_numbers = Sum()([number_1, number_2, number_3])

flow.storage = Local()
flow.register(...)
v
sure
k
That’s weird though btw. It looks right
v
its only with the DaskExecutor that i cannot trigger from cloud. I am able to run it locally
in parallel
ok so i get the exact same error
with the functional API
k
Thinking
Will try this
πŸ‘ 1
Do you have a DaskCluster for that DaskExecutor?
v
no
Copy code
from prefect.executors import DaskExecutor
just using this
just fyi the logs might help :
this is when i run it locally using DaskExecutor
k
Copy code
import logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local


class RandomNum(Task):
    def run(self, stop):
        number = randrange(stop)
        print(f"Your number is {number}")
        <http://logging.info|logging.info>(f"Your number is {number}")
        return number


class Sum(Task):
    def run(self, numbers):
        print(sum(numbers))
        <http://logging.info|logging.info>(sum(numbers))


with Flow("parallel-task") as flow:
    stop = Parameter("stop", default=3)

    number_1 = RandomNum()(stop)
    number_2 = RandomNum()(stop)
    number_3 = RandomNum()(stop)

    sum_numbers = Sum()([number_1, number_2, number_3])

flow.storage = Local()
# flow.register(project_name='Parallel_Demo')
flow.executor = DaskExecutor()
if __name__ == "__main__":
    flow.run(parameters={"stop": 5})
does this work for you?
v
yes but locally
i cannot run it through prefect cloud when i trigger the flow
my prefect version is 0.15.6 if that has anything to do
k
ah i see ok one sec will register and run
It seems to work. Here are my logs:
Copy code
└── 17:15:12 | INFO    | Entered state <Scheduled>: Flow run scheduled.
└── 17:15:14 | INFO    | Entered state <Submitted>: Submitted for execution
└── 17:15:14 | INFO    | Submitted for execution: PID: 56451
└── 17:15:17 | INFO    | Entered state <Running>: Running flow.
└── 17:15:15 | INFO    | Beginning Flow run for 'parallel-task'
└── 17:15:17 | INFO    | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
└── 17:15:19 | INFO    | The Dask dashboard is available at <http://127.0.0.1:8787/status>
└── 17:15:20 | INFO    | Task 'RandomNum': Starting task run...
└── 17:15:21 | INFO    | Task 'RandomNum': Finished task run for task with final state: 'Success'
└── 17:15:23 | INFO    | Entered state <Success>: All reference tasks succeeded.
v
so it does work locally. but try to trigger the flow from the cloud UI
k
i did these are the logs from the UI
could you show me logs?
You can run like this:
prefect run --project bristech --name parallel-task --watch
. Just replace with you project name and this will trigger a cloud run on the agent
ohh actually, can you try registering without your
if name == main
section? Maybe that has something to do with it
Yes I replicated so what’s causing it is this:
Copy code
flow.register("bristech")
flow.executor = DaskExecutor()
if __name__ == "__main__":
    flow.run(parameters={"stop": 5})
There will be a double run with
flow.run()
which causes that weird Dask issue
If you register without the if name == main, I think your code will work
v
@Kevin Kho Thanks for helping I tried without the if name == main but is still get the same error:
Copy code
(python37) Vamsis-MacBook-Pro:~ vamsi$ prefect run --project Parallel_Demo --name parallel-tasks --param stop=5 --watch
Looking up flow metadata... Done
Creating run for flow 'parallel-tasks'... Done
└── Name: tactful-elephant
└── UUID: c40eee80-cf74-40a8-b517-ca3bda2bb062
└── Labels: ['Vamsis-MacBook-Pro.local']
└── Parameters: {'stop': 5}
└── Context: {}
└── URL: <https://cloud.prefect.io/humanyze-eng/flow-run/c40eee80-cf74-40a8-b517-ca3bda2bb062>
Watching flow run execution...
└── 09:59:45 | INFO    | Entered state <Scheduled>: Flow run scheduled.
└── 09:59:50 | INFO    | Entered state <Submitted>: Submitted for execution
└── 09:59:50 | INFO    | Submitted for execution: PID: 25010
└── 09:59:50 | INFO    | Entered state <Failed>: Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
└── 09:59:51 | ERROR   | Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
Flow run failed!
k
This is after re-registering?
v
yes
i deleted the project and re-registered
k
Could you give me your current code?
v
sure
Copy code
import logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local


class RandomNum(Task):
    def run(self, stop):
        number = randrange(stop)
        print(f"Your number is {number}")
        <http://logging.info|logging.info>(f"Your number is {number}")
        return number


class Sum(Task):
    def run(self, numbers):
        print(sum(numbers))
        <http://logging.info|logging.info>(sum(numbers))


flow = Flow("parallel-tasks")

stop = Parameter("stop")

number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()

stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)

sum_numbers = Sum()

sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)
flow.storage = Local()
flow.register(project_name='Parallel_Demo')
flow.run(parameters={"stop": 5}, executor=DaskExecutor())
k
I believe you need to take out the run line
v
if i run this program locally without the if name == main block i get this errors/warnings but still the flow/tasks finish correctly:
Copy code
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
k
I think you will run into problems keeping that
flow.run()
line for registration
But really though if you are not using a Dask cluster, I think you won’t run into these problems with LocalDaskExecutor
And I think this might also work if you have a cluster, just using the DaskExecutor without a cluster is causing this stuff
v
ok but the LocalDaskExecutor does not run tasks in parallel
I will try running it with a Dask Cluster and check
k
If it still doesn’t work, I think it’s just a matter for removing the
flow.run()
when you register
v
i was trying to follow this example: https://docs.prefect.io/core/idioms/parallel.html
k
Gotcha I think this used to work, but I run into the same error. Could you tell me your dask and distributed versions?
How do you know LocalDaskExecutor executor is not running parallel? Did you try a
time.sleep()
in the task?
v
I dont think i have dask installed seperately
I am just importing the DaskExecutor from Prefect Library
k
That will install dask for you so you can do
pip show dask
and
pip show distributed
. Also if you do,
Copy code
class RandomNum(Task):
    def run(self, stop):
        import time
        time.sleep(5)
        number = randrange(stop)
        print(f"Your number is {number}")
        return number
with LocalDaskExecutor, this will run in parallel
v
so in a sense this is not truly parallel as i make my task wait for 5 seconds so sort of a work around?
ok so prefect must have installed dask as a dependency for me so here are the version details :
Copy code
(python37) Vamsis-MacBook-Pro:~ vamsi$ pip show dask
Name: dask
Version: 2021.9.1
Summary: Parallel PyData with Task Scheduling
Home-page: <https://github.com/dask/dask/>
Author: None
Author-email: None
License: BSD
Location: /Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages
Requires: partd, pyyaml, toolz, packaging, cloudpickle, fsspec
Required-by: prefect, distributed
(python37) Vamsis-MacBook-Pro:~ vamsi$ pip show distributed
Name: distributed
Version: 2021.9.1
Summary: Distributed scheduler for Dask
Home-page: <https://distributed.dask.org>
Author: None
Author-email: None
License: BSD
Location: /Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages
Requires: msgpack, toolz, sortedcontainers, setuptools, tblib, pyyaml, tornado, psutil, cloudpickle, click, dask, jinja2, zict
Required-by: prefect
(python37) Vamsis-MacBook-Pro:~ vamsi$
k
No no. The
time.sleep(5)
will show you that if you map across 3 times, the total execution time will be roughly 5 seconds instead of 15 so it is being parallelized with the LocalDaskExecutor
v
ok let me try this and see
I want to see on the gantt chart three tasks starting at the same time 🀞
so tried running using the LocalDaskExecutor with time.sleep(5)
The tasks do not start at the same time
k
One sec will try
πŸ‘ 1
v
each of them wait for 5 sec
k
Did you still have the
flow.run()
in there?
v
i think yes
k
Ok will try again
v
but without the flow.run() how would it know that it needs to use LocalDaskExecutor πŸ€”
k
v
this is what i wanted 😯
k
oh oops i edited the code
this is my current code
Copy code
from random import randrange

from prefect import Flow, Parameter, Task

class RandomNum(Task):
    def run(self, stop):
        import time
        time.sleep(5)
        number = randrange(stop)
        print(f"Your number is {number}")
        return number

class Sum(Task):
    def run(self, numbers):
        print(sum(numbers))

flow = Flow("parallel-execution")

stop = Parameter("stop")

number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()

stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)

sum_numbers = Sum()

sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)

from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor()
flow.register("bristech")
flow.run(parameters={"stop": 5}, executor=LocalDaskExecutor())
I think maybe you just didn’t attach the executor?
v
ok let me check
Thank you very much @Kevin Kho πŸŽ‰πŸ™Œ:
k
Oh nice. Glad we got that figured out
v
i was missing this flow.executor = LocalDaskExecutor()
so DaskExecutor is only recommended if we have our own cluster otherwise LocalDaskExecutor seems to be the best choice
?
k
I would say so because it just adds more overhead for no reason. I believe it creates a local cluster whereas LocalDaskExecutor is a multiprocessing pool. So for multiprocessing if you use threads, you can still share data between processes as long as they are thread safe. With the DaskExecutor LocalCluster, you have to make copies of the data to pass them to workers I think
v
Thank you very much. This will be helpful for us in designing our workflows
πŸ‘ 1
k
Of course!
y
Is it necessary to call
flow.run(parameters={"stop": 5}, executor=LocalDaskExecutor())
to make it use Dask? Can i just do
flow.executor = LocalDaskExecutor()
and register my flow on the server, so when it will be runned by the server, it will use Dask?
k
Yes
flow.run
is really just got testing. For production you should do what you are suggesting.
y
Hmmm... Well, i've got code like this for my flows:
Copy code
...
flow.run_config = DockerRun(image=DOCKER_IMAGE_NAME) 
flow.storage = Module("nvkprefectflows.flows.myflow.flow")
flow.executor = DaskExecutor()
flow.register(project_name=PROJECT_NAME, idempotency_key=flow.serialized_hash())
...
And i can't make it actually use Dask when i start my flows from the cloud.prefect.io. My agent executes my flow like it has LocalExecutor() set. Meanwhile when i run it locally it works fine.
k
You can run the flow with debug level logs and it logs the executor. What happens also if you use
LocalDaskExecutor
?
y
Debug log:
Copy code
Oct 22 18:19:26 prefect prefect[25378]: [2021-10-22 18:19:26,899] INFO - Simple Alpha | Completed deployment of flow run 4bb5703d-229d-4de5-8c21-8cb2cf643508
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'test_parallel_flow'
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] DEBUG - prefect.CloudFlowRunner | Flow 'test_parallel_flow': Handling state change from Scheduled to Running
Yes, its using LocalExecutor... Didn't see any difference between DaskExecutor() and LocalDaskExecutor()
k
Are you importing your executor definition from somewhere else? Or really just doing
DaskExecutor()
? What storage are you using?
y
Everything is from `prefect`:
Copy code
from prefect.run_configs import DockerRun
from prefect.storage import Module
from prefect.executors import DaskExecutor
Looks like somehow my flow is registered with LocalExecutor() instead of DaskExecutor()...
k
Let me look a bit into Module storage
Did you re-build your module maybe after adding the executor?
y
No, code above is a part of a " flow deploying" script and setting an executor is the last step before registering
k
Ohh. I understand. You import the flow and then set the executor right before registering?
y
Yep.
k
Ok so this a tricky but the reason this doesn’t work is that when you register a flow, the info such as storage, result, run configuration is all serialized and then sent to Prefect. The executor is not part of that. So when the flow gets loaded from storage, Prefect looks for the executor in that file.
if you do
flow.executor = …
in the Flow file, I think it should work. The executor is not serialized because it can contain stuff like Dask cluster address that users may not want to give us.
y
hmmm... it wasn't obvious, but makes sense. will try
k
I know it’s not. I had to dig deep to understand myself.
y
Yes, with executor set in the Flow file everything works fine. Well, if i can't specify it after import, then ok. But i really wanted to separate "writing" flows and all that stuff related to "running" flows. And executor setting is the only one out of line
Thanks for your help
k
Yeah that’s the only one for sure. No problem!
y
And can you link to the exact place in source code where we exclude
executor
from serialization? Maybe i'll try to patch it for myself
k
I don’t think it’s that simple that it can be patched because it requires backend changes to store it in our database. Let me ask with the team and get back to you on what the plans are around this.
πŸ™ 1