Vamsi Reddy
10/07/2021, 6:35 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\'
Kevin Kho
RandomNum
?Vamsi Reddy
10/07/2021, 6:36 PMKevin Kho
Vamsi Reddy
10/07/2021, 6:36 PMKevin Kho
RandomNum
import, this should be successfulVamsi Reddy
10/07/2021, 6:37 PMVamsi Reddy
10/07/2021, 7:02 PMVamsi Reddy
10/07/2021, 7:02 PMVamsi Reddy
10/07/2021, 7:21 PMFlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
Vamsi Reddy
10/07/2021, 7:35 PMKevin Kho
Vamsi Reddy
10/07/2021, 8:17 PMimport logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local
class RandomNum(Task):
def run(self, stop):
number = randrange(stop)
print(f"Your number is {number}")
<http://logging.info|logging.info>(f"Your number is {number}")
return number
class Sum(Task):
def run(self, numbers):
print(sum(numbers))
<http://logging.info|logging.info>(sum(numbers))
flow = Flow("parallel-tasks")
stop = Parameter("stop")
number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()
stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)
sum_numbers = Sum()
sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)
flow.storage = Local()
flow.register(project_name='Parallel_Demo')
if __name__ == '__main__':
flow.run(parameters={"stop": 5}, executor=DaskExecutor())
Vamsi Reddy
10/07/2021, 8:17 PMVamsi Reddy
10/07/2021, 8:17 PMVamsi Reddy
10/07/2021, 8:18 PMVamsi Reddy
10/07/2021, 8:18 PMVamsi Reddy
10/07/2021, 8:18 PMKevin Kho
with Flow() as flow:
stop = Parameter("stop")
number_1 = RandomNum(upstream_tasks=[stop])()
number_2 = RandomNum(upstream_tasks=[stop])()
number_3 = RandomNum(upstream_tasks=[stop])()
sum_numbers = Sum()([number_1, number_2, number_3])
flow.storage = Local()
flow.register(...)
Vamsi Reddy
10/07/2021, 8:28 PMKevin Kho
Vamsi Reddy
10/07/2021, 8:30 PMVamsi Reddy
10/07/2021, 8:30 PMVamsi Reddy
10/07/2021, 8:35 PMVamsi Reddy
10/07/2021, 8:35 PMKevin Kho
Kevin Kho
Kevin Kho
Vamsi Reddy
10/07/2021, 8:51 PMVamsi Reddy
10/07/2021, 8:51 PMfrom prefect.executors import DaskExecutor
just using thisVamsi Reddy
10/07/2021, 8:53 PMVamsi Reddy
10/07/2021, 8:53 PMVamsi Reddy
10/07/2021, 8:53 PMKevin Kho
import logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local
class RandomNum(Task):
def run(self, stop):
number = randrange(stop)
print(f"Your number is {number}")
<http://logging.info|logging.info>(f"Your number is {number}")
return number
class Sum(Task):
def run(self, numbers):
print(sum(numbers))
<http://logging.info|logging.info>(sum(numbers))
with Flow("parallel-task") as flow:
stop = Parameter("stop", default=3)
number_1 = RandomNum()(stop)
number_2 = RandomNum()(stop)
number_3 = RandomNum()(stop)
sum_numbers = Sum()([number_1, number_2, number_3])
flow.storage = Local()
# flow.register(project_name='Parallel_Demo')
flow.executor = DaskExecutor()
if __name__ == "__main__":
flow.run(parameters={"stop": 5})
Kevin Kho
Vamsi Reddy
10/07/2021, 9:00 PMVamsi Reddy
10/07/2021, 9:01 PMVamsi Reddy
10/07/2021, 9:02 PMKevin Kho
Kevin Kho
βββ 17:15:12 | INFO | Entered state <Scheduled>: Flow run scheduled.
βββ 17:15:14 | INFO | Entered state <Submitted>: Submitted for execution
βββ 17:15:14 | INFO | Submitted for execution: PID: 56451
βββ 17:15:17 | INFO | Entered state <Running>: Running flow.
βββ 17:15:15 | INFO | Beginning Flow run for 'parallel-task'
βββ 17:15:17 | INFO | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
βββ 17:15:19 | INFO | The Dask dashboard is available at <http://127.0.0.1:8787/status>
βββ 17:15:20 | INFO | Task 'RandomNum': Starting task run...
βββ 17:15:21 | INFO | Task 'RandomNum': Finished task run for task with final state: 'Success'
βββ 17:15:23 | INFO | Entered state <Success>: All reference tasks succeeded.
Vamsi Reddy
10/07/2021, 9:43 PMKevin Kho
Kevin Kho
Kevin Kho
prefect run --project bristech --name parallel-task --watch
. Just replace with you project name and this will trigger a cloud run on the agentKevin Kho
if name == main
section? Maybe that has something to do with itKevin Kho
flow.register("bristech")
flow.executor = DaskExecutor()
if __name__ == "__main__":
flow.run(parameters={"stop": 5})
There will be a double run with flow.run()
which causes that weird Dask issueKevin Kho
Vamsi Reddy
10/08/2021, 2:03 PM(python37) Vamsis-MacBook-Pro:~ vamsi$ prefect run --project Parallel_Demo --name parallel-tasks --param stop=5 --watch
Looking up flow metadata... Done
Creating run for flow 'parallel-tasks'... Done
βββ Name: tactful-elephant
βββ UUID: c40eee80-cf74-40a8-b517-ca3bda2bb062
βββ Labels: ['Vamsis-MacBook-Pro.local']
βββ Parameters: {'stop': 5}
βββ Context: {}
βββ URL: <https://cloud.prefect.io/humanyze-eng/flow-run/c40eee80-cf74-40a8-b517-ca3bda2bb062>
Watching flow run execution...
βββ 09:59:45 | INFO | Entered state <Scheduled>: Flow run scheduled.
βββ 09:59:50 | INFO | Entered state <Submitted>: Submitted for execution
βββ 09:59:50 | INFO | Submitted for execution: PID: 25010
βββ 09:59:50 | INFO | Entered state <Failed>: Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
βββ 09:59:51 | ERROR | Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'RandomNum\' on <module \'prefect.__main__\' from \'/Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages/prefect/__main__.py\'>")')
Flow run failed!
Kevin Kho
Vamsi Reddy
10/08/2021, 2:04 PMVamsi Reddy
10/08/2021, 2:04 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:06 PMVamsi Reddy
10/08/2021, 2:06 PMimport logging
from random import randrange
from prefect import Flow, Parameter, Task
from prefect.executors import DaskExecutor
from prefect.storage.local import Local
class RandomNum(Task):
def run(self, stop):
number = randrange(stop)
print(f"Your number is {number}")
<http://logging.info|logging.info>(f"Your number is {number}")
return number
class Sum(Task):
def run(self, numbers):
print(sum(numbers))
<http://logging.info|logging.info>(sum(numbers))
flow = Flow("parallel-tasks")
stop = Parameter("stop")
number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()
stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)
sum_numbers = Sum()
sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)
flow.storage = Local()
flow.register(project_name='Parallel_Demo')
flow.run(parameters={"stop": 5}, executor=DaskExecutor())
Kevin Kho
Vamsi Reddy
10/08/2021, 2:09 PMRuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Kevin Kho
flow.run()
line for registrationKevin Kho
Kevin Kho
Vamsi Reddy
10/08/2021, 2:12 PMVamsi Reddy
10/08/2021, 2:12 PMKevin Kho
flow.run()
when you registerVamsi Reddy
10/08/2021, 2:13 PMKevin Kho
Kevin Kho
time.sleep()
in the task?Vamsi Reddy
10/08/2021, 2:16 PMVamsi Reddy
10/08/2021, 2:16 PMKevin Kho
pip show dask
and pip show distributed
. Also if you do,
class RandomNum(Task):
def run(self, stop):
import time
time.sleep(5)
number = randrange(stop)
print(f"Your number is {number}")
return number
with LocalDaskExecutor, this will run in parallelVamsi Reddy
10/08/2021, 2:20 PMVamsi Reddy
10/08/2021, 2:22 PM(python37) Vamsis-MacBook-Pro:~ vamsi$ pip show dask
Name: dask
Version: 2021.9.1
Summary: Parallel PyData with Task Scheduling
Home-page: <https://github.com/dask/dask/>
Author: None
Author-email: None
License: BSD
Location: /Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages
Requires: partd, pyyaml, toolz, packaging, cloudpickle, fsspec
Required-by: prefect, distributed
(python37) Vamsis-MacBook-Pro:~ vamsi$ pip show distributed
Name: distributed
Version: 2021.9.1
Summary: Distributed scheduler for Dask
Home-page: <https://distributed.dask.org>
Author: None
Author-email: None
License: BSD
Location: /Users/vamsi/opt/anaconda3/envs/python37/lib/python3.7/site-packages
Requires: msgpack, toolz, sortedcontainers, setuptools, tblib, pyyaml, tornado, psutil, cloudpickle, click, dask, jinja2, zict
Required-by: prefect
(python37) Vamsis-MacBook-Pro:~ vamsi$
Kevin Kho
time.sleep(5)
will show you that if you map across 3 times, the total execution time will be roughly 5 seconds instead of 15 so it is being parallelized with the LocalDaskExecutorVamsi Reddy
10/08/2021, 2:25 PMVamsi Reddy
10/08/2021, 2:25 PMVamsi Reddy
10/08/2021, 2:29 PMVamsi Reddy
10/08/2021, 2:30 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:30 PMKevin Kho
Kevin Kho
flow.run()
in there?Vamsi Reddy
10/08/2021, 2:35 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:36 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:37 PMKevin Kho
Kevin Kho
Kevin Kho
from random import randrange
from prefect import Flow, Parameter, Task
class RandomNum(Task):
def run(self, stop):
import time
time.sleep(5)
number = randrange(stop)
print(f"Your number is {number}")
return number
class Sum(Task):
def run(self, numbers):
print(sum(numbers))
flow = Flow("parallel-execution")
stop = Parameter("stop")
number_1 = RandomNum()
number_2 = RandomNum()
number_3 = RandomNum()
stop.set_downstream(number_1, key="stop", flow=flow)
stop.set_downstream(number_2, key="stop", flow=flow)
stop.set_downstream(number_3, key="stop", flow=flow)
sum_numbers = Sum()
sum_numbers.bind(numbers=[number_1, number_2, number_3], flow=flow)
from prefect.executors import LocalDaskExecutor
flow.executor = LocalDaskExecutor()
flow.register("bristech")
flow.run(parameters={"stop": 5}, executor=LocalDaskExecutor())
Kevin Kho
Vamsi Reddy
10/08/2021, 2:40 PMVamsi Reddy
10/08/2021, 2:42 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:43 PMVamsi Reddy
10/08/2021, 2:43 PMVamsi Reddy
10/08/2021, 2:43 PMKevin Kho
Vamsi Reddy
10/08/2021, 2:53 PMKevin Kho
Yury Cheremushkin
10/22/2021, 5:54 PMflow.run(parameters={"stop": 5}, executor=LocalDaskExecutor())
to make it use Dask? Can i just do flow.executor = LocalDaskExecutor()
and register my flow on the server, so when it will be runned by the server, it will use Dask?Kevin Kho
flow.run
is really just got testing. For production you should do what you are suggesting.Yury Cheremushkin
10/22/2021, 6:13 PM...
flow.run_config = DockerRun(image=DOCKER_IMAGE_NAME)
flow.storage = Module("nvkprefectflows.flows.myflow.flow")
flow.executor = DaskExecutor()
flow.register(project_name=PROJECT_NAME, idempotency_key=flow.serialized_hash())
...
And i can't make it actually use Dask when i start my flows from the cloud.prefect.io. My agent executes my flow like it has LocalExecutor() set. Meanwhile when i run it locally it works fine.Kevin Kho
LocalDaskExecutor
?Yury Cheremushkin
10/22/2021, 6:21 PMOct 22 18:19:26 prefect prefect[25378]: [2021-10-22 18:19:26,899] INFO - Simple Alpha | Completed deployment of flow run 4bb5703d-229d-4de5-8c21-8cb2cf643508
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'test_parallel_flow'
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] DEBUG - prefect.CloudFlowRunner | Using executor type LocalExecutor
Oct 22 18:19:36 prefect prefect[25378]: [2021-10-22 18:19:27+0000] DEBUG - prefect.CloudFlowRunner | Flow 'test_parallel_flow': Handling state change from Scheduled to Running
Yes, its using LocalExecutor... Didn't see any difference between DaskExecutor() and LocalDaskExecutor()Kevin Kho
DaskExecutor()
? What storage are you using?Yury Cheremushkin
10/22/2021, 6:32 PMfrom prefect.run_configs import DockerRun
from prefect.storage import Module
from prefect.executors import DaskExecutor
Looks like somehow my flow is registered with LocalExecutor() instead of DaskExecutor()...Kevin Kho
Kevin Kho
Yury Cheremushkin
10/22/2021, 6:37 PMKevin Kho
Yury Cheremushkin
10/22/2021, 6:38 PMKevin Kho
Kevin Kho
flow.executor = β¦
in the Flow file, I think it should work. The executor is not serialized because it can contain stuff like Dask cluster address that users may not want to give us.Yury Cheremushkin
10/22/2021, 6:43 PMKevin Kho
Yury Cheremushkin
10/22/2021, 6:57 PMYury Cheremushkin
10/22/2021, 6:57 PMKevin Kho
Yury Cheremushkin
10/22/2021, 7:00 PMexecutor
from serialization? Maybe i'll try to patch it for myselfKevin Kho