Tomer Cagan
03/10/2022, 8:38 AMworker_client
inside a task to be able to do my own dynamic parallelization (a requirement for the algorithm that I run).
I saw in the dask documentation that it is possible to specify "single-threaded"
scheduler for debugging and this is my "end-goal" with doing it.
I have the following code:
import time
import random
from typing import List
from prefect import Flow, task, Parameter
from prefect.storage import Docker
from prefect.executors import DaskExecutor, LocalDaskExecutor
from dask.distributed import worker_client
def fib(value: int) -> int:
delay = random.random() * 2
print(f"calculating fib({value}). sleeping {delay} seconds to simulate slow processing")
time.sleep(delay)
if value < 2:
return value
with worker_client() as client:
a_future = client.submit(fib, value - 1)
b_future = client.submit(fib, value - 2)
a, b = client.gather([a_future, b_future])
return a + b
@task(log_stdout=True)
def compute_with_dask(values: List[int]) -> int:
max_value = max(values)
print(f"got a list of size {len(values)} - starting long process to calculate fib({max_value})")
results = fib(max_value)
print(f"got results of calculation, the value is {results}")
@task
def load_stuff():
return [1,2,3,4]
with Flow("fib-dask-flow") as flow:
loaded = load_stuff()
compute_with_dask(loaded)
flow.storage = Docker(
registry_url="<http://docker.k8s.nextsilicon.com/repository/container-ns/|docker.k8s.nextsilicon.com/repository/container-ns/>",
image_name="research-dask-fib",
)
flow.executor = DaskExecutor()
if __name__ == "__main__":
flow.executor = LocalDaskExecutor("synchronous")
flow.storage = None
flow.run()
print("running main")
And I am getting the following error:
% python flow_recur.py
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'load_stuff': Starting task run...
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'load_stuff': Finished task run for task with final state: 'Success'
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'compute_with_dask': Starting task run...
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | got a list of size 4 - starting long process to calculate fib(11)
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | calculating fib(4). sleeping 1.8652929878459892 seconds to simulate slow processing
[2022-03-10 10:42:35+0200] ERROR - prefect.TaskRunner | Task 'compute_with_dask': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4219, in get_worker
return thread_state.execution_state["worker"]
AttributeError: '_thread._local' object has no attribute 'execution_state'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4222, in get_worker
return first(
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/toolz/itertoolz.py", line 376, in first
return next(iter(seq))
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/tomercagan/dev/prefect-play/flow_recur.py", line 40, in compute_with_dask
results = fib(max_value)
File "/Users/tomercagan/dev/prefect-play/flow_recur.py", line 17, in fib
with worker_client() as client:
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in __enter__
return next(self.gen)
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker_client.py", line 52, in worker_client
worker = get_worker()
File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4228, in get_worker
raise ValueError("No workers found")
ValueError: No workers found
[2022-03-10 10:42:35+0200] INFO - prefect.TaskRunner | Task 'compute_with_dask': Finished task run for task with final state: 'Failed'
[2022-03-10 10:42:35+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
running main
prefect-play %
This happens to me whether I am using "syncronous"
or leave the LocalDaskCluster
constructor empty.
I am guessing that I am doing something wrong because there's not much there.
Also, running this as a registered flow with Prefect Server it works fineif __name__ == "__main__":
print("running main")
executor = DaskExecutor()
flow.storage = None
flow.run(executor=executor)
I am also able to stop in breakpoint (visual studio code) but it is hard to debug due to multiple threads running...Anna Geller
03/10/2022, 10:10 AMfib
function without Prefect, I think you should see the same error - LMK if this is not true. But if it's true and it's purely a Dask issue you are dealing with, you may get a smarter response that I can give you if you post your question in the Dask Discourse 🙂
Lastly, when it comes to things like your Fibonacci example, actually you may be better off doing something like task looping rather than parallel processingTomer Cagan
03/10/2022, 10:23 AMif __name__ == "__main__":
client = Client(n_workers= 1, threads_per_worker = 1)
future = client.submit(fib, 10)
print(future.result())
It works fine and I can debug - so I don't think it is a dask issue. I believe it is something wrong with how I initialize the dask cluster. Also, with DaskExecutor
it works fine and I can also debug.Anna Geller
03/10/2022, 10:44 AMDockerRun(..., host_config={"xxx": xxx)
Maybe indeed for such use cases you're better off using Orion directly since you could simply run this directly in your flow:
@flow
def your_flow_function():
client = Client(n_workers= 1, threads_per_worker = 1)
future = client.submit(fib, 10)
print(future.result())
Tomer Cagan
03/10/2022, 11:47 AMflow.run()
I took your advice and did some of my own tests and this flow that I currently have works nicely on Prefect 1.0 and sort of giving me what I wanted (minus the extra observability that Prefect provides).
In the example you provided (I think), that flow will spawn its own cluster which is not what I am trying to do.
As for Orion - I like a lot of the new ideas there but I am not sure whether I will be able to sell it to the team - especially since right now, in order to "piggyback" onto the prefect-spawned dask cluster, I would have to use async which the team here does not like so much (plus it being in beta stages).
I ought to write some article at the end of my discovery process 🙂Anna Geller
03/10/2022, 12:02 PMflow.run()
since storage is built on registration (not at runtime) - Docker was a red herring here 🙂
Just to be sure we are on the same page: Orion client is async, but you don't have to use async tasks/flows and you don't need to know anything about async to use Orion, especially if you use your custom Python code to achieve parallelism in the flow and you don't rely on any prebuilt task runners.
And for Prefect 1.0, given your use case, I wonder whether spinning up a long running standalone Dask cluster may help in your use case - this way, regardless of whether you run it locally or triggering a backend run, you may leverage the same client configuration in your task. There are some tradeoffs here, but just making sure you are aware of that option as well: https://discourse.prefect.io/t/are-there-any-guidelines-for-using-a-temporary-vs-static-dask-cluster-how-to-set-one-on-kubernetes/342/2#h-3-[…]ter-5Tomer Cagan
03/10/2022, 12:28 PMDaskExecutor
to run locally and with a single worker and debug using it...Anna Geller
03/10/2022, 12:39 PMKevin Kho
03/10/2022, 2:53 PMa_future
and b_future
simultaneously trying to read value from different workers. Very unsure. But I think this might be more stable with processes as the scheduler? It looks like the threads are hitting each other so you’d otherwise need the lock. Really unsureTomer Cagan
03/13/2022, 8:15 AMclient.submit
, is creates a Future
and not Delayed
(thought, tbh, I am not exactly sure what the difference is 😉).
I will try to look understand the SO thread more deeply and maybe I can learn from but I am not using locks explicitly.
Also, I am confused why is works ok with DaskExecutor
and not with LocalDaskExecutor
(with and without ("synchronous"))
Is there a way to configure DaskExecutor
for a single-worker, single thread? I guess that could possible help me in debugging (in the general sense, not this issue specifically)Anna Geller
03/13/2022, 9:40 AMI am confused why is works ok withand not withDaskExecutor
(with and without ("synchronous"))LocalDaskExecutor
LocalDaskExecutor
relies on multithreading and multiprocessing without creating a local Dask cluster. In contrast, DaskExecutor
creates a local Dask cluster.
https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374Tomer Cagan
03/13/2022, 9:44 AMif __name__ == "__main__":
print("running main")
executor = DaskExecutor(debug=True, cluster_kwargs={"n_workers": 1})
flow.storage = None
flow.run(executor=executor)
Sometimes it seems to get stuck in case I am stopping / running many time with a breakpointAnna Geller
03/13/2022, 11:42 AMflow.run()
Tomer Cagan
03/13/2022, 11:50 AMMadhup Sukoon
03/16/2022, 6:41 PMZanie
03/16/2022, 6:42 PMIf a command is found but is not executable, the return status is 126.
Madhup Sukoon
03/16/2022, 6:44 PMZanie
03/16/2022, 6:47 PMMadhup Sukoon
03/16/2022, 6:47 PMZanie
03/16/2022, 6:48 PMprefect flow-run execute
in the containerMadhup Sukoon
03/16/2022, 6:49 PMZanie
03/16/2022, 6:49 PMdocker run -t <your-image> prefect flow-run execute
Madhup Sukoon
03/16/2022, 6:49 PM