Hi, Trying to understand how I can use prefect wit...
# prefect-community
t
Hi, Trying to understand how I can use prefect with dask while still debug locally I am getting an error. More details, code and errors inside thread
A few notes: I am using
worker_client
inside a task to be able to do my own dynamic parallelization (a requirement for the algorithm that I run). I saw in the dask documentation that it is possible to specify
"single-threaded"
scheduler for debugging and this is my "end-goal" with doing it. I have the following code:
Copy code
import time
import random
from typing import List
from prefect import Flow, task, Parameter
from prefect.storage import Docker
from prefect.executors import DaskExecutor, LocalDaskExecutor
from dask.distributed import worker_client


def fib(value: int) -> int:
    delay = random.random() * 2
    print(f"calculating fib({value}). sleeping {delay} seconds to simulate slow processing")
    time.sleep(delay)
    if value < 2:
        return value
    with worker_client() as client:
        a_future = client.submit(fib, value - 1)
        b_future = client.submit(fib, value - 2)
        a, b = client.gather([a_future, b_future])
        return a + b


@task(log_stdout=True)
def compute_with_dask(values: List[int]) -> int:
    max_value = max(values)
    print(f"got a list of size {len(values)} - starting long process to calculate fib({max_value})")
    results = fib(max_value)
    print(f"got results of calculation, the value is {results}")

@task
def load_stuff():
    return [1,2,3,4]

with Flow("fib-dask-flow") as flow:
    loaded = load_stuff()
    compute_with_dask(loaded)


flow.storage = Docker(
    registry_url="<http://docker.k8s.nextsilicon.com/repository/container-ns/|docker.k8s.nextsilicon.com/repository/container-ns/>",
    image_name="research-dask-fib",
)

flow.executor = DaskExecutor()

if __name__ == "__main__":
    flow.executor = LocalDaskExecutor("synchronous")
    flow.storage = None
    flow.run()
    print("running main")
And I am getting the following error:
Copy code
% python flow_recur.py 
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'load_stuff': Starting task run...
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'load_stuff': Finished task run for task with final state: 'Success'
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | Task 'compute_with_dask': Starting task run...
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | got a list of size 4 - starting long process to calculate fib(11)
[2022-03-10 10:42:33+0200] INFO - prefect.TaskRunner | calculating fib(4). sleeping 1.8652929878459892 seconds to simulate slow processing
[2022-03-10 10:42:35+0200] ERROR - prefect.TaskRunner | Task 'compute_with_dask': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4219, in get_worker
    return thread_state.execution_state["worker"]
AttributeError: '_thread._local' object has no attribute 'execution_state'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4222, in get_worker
    return first(
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/toolz/itertoolz.py", line 376, in first
    return next(iter(seq))
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/tomercagan/dev/prefect-play/flow_recur.py", line 40, in compute_with_dask
    results = fib(max_value)
  File "/Users/tomercagan/dev/prefect-play/flow_recur.py", line 17, in fib
    with worker_client() as client:
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in __enter__
    return next(self.gen)
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker_client.py", line 52, in worker_client
    worker = get_worker()
  File "/Users/tomercagan/dev/prefect-play/venv/lib/python3.9/site-packages/distributed/worker.py", line 4228, in get_worker
    raise ValueError("No workers found")
ValueError: No workers found
[2022-03-10 10:42:35+0200] INFO - prefect.TaskRunner | Task 'compute_with_dask': Finished task run for task with final state: 'Failed'
[2022-03-10 10:42:35+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
running main
prefect-play %
This happens to me whether I am using
"syncronous"
or leave the
LocalDaskCluster
constructor empty. I am guessing that I am doing something wrong because there's not much there. Also, running this as a registered flow with Prefect Server it works fine
This does work:
Copy code
if __name__ == "__main__":
    print("running main")
    executor = DaskExecutor()
    flow.storage = None
    flow.run(executor=executor)
I am also able to stop in breakpoint (visual studio code) but it is hard to debug due to multiple threads running...
a
I may be wrong here but I believe that attaching Dask executor to your flow matters for: • mapping • running tasks independent of each other in parallel If you implement your own worker client within your Prefect task, you: 1. Don't use Prefect's paralellism which is why I think the choice of executor likely doesn't matter much in this example flow - you should see the same behavior even without attaching any executor 2. Lose observability that Prefect provides to each mapped child task when parallelizing your processing (which may be a totally valid option in your use case, just bringing it up for you to know) If you run this script with the
fib
function without Prefect, I think you should see the same error - LMK if this is not true. But if it's true and it's purely a Dask issue you are dealing with, you may get a smarter response that I can give you if you post your question in the Dask Discourse 🙂 Lastly, when it comes to things like your Fibonacci example, actually you may be better off doing something like task looping rather than parallel processing
t
If I change my main to this:
Copy code
if __name__ == "__main__":
    client = Client(n_workers= 1, threads_per_worker = 1)
    future = client.submit(fib, 10)
    print(future.result())
It works fine and I can debug - so I don't think it is a dask issue. I believe it is something wrong with how I initialize the dask cluster. Also, with
DaskExecutor
it works fine and I can also debug.
I understand the issues you brought up. As I stated before, the algorithm we are trying to orchestrate is kind of dynamic and recursive and I am not sure that task looping would work in this case.
BTW, in the code above, I am not implementing my own worker client - I am piggy-backing on the dask cluster already created by Prefect. So it will be running additional tasks in parallel to the "Prefect tasks" (at least as far as I understand)
a
Gotcha, I remember we had this conversation before and I remember recommending resource manager for such use cases. Usually users who need access to the Dask client directly initialize it with resource manager - but I agree that given you initialize this within a task it shouldn't matter. Perhaps running this within a Docker container is the issue? Sometimes it's hard to allocate enough resources from the VM to the container and you may need to change your Docker settings or use host config:
Copy code
DockerRun(..., host_config={"xxx": xxx)
Maybe indeed for such use cases you're better off using Orion directly since you could simply run this directly in your flow:
Copy code
@flow
def your_flow_function():
    client = Client(n_workers= 1, threads_per_worker = 1)
    future = client.submit(fib, 10)
    print(future.result())
t
Yeah, we've kind of carried out this conversation over several threads. I am not running on docker there - it is running locally - note how in the main I reset the storage and I just do
flow.run()
I took your advice and did some of my own tests and this flow that I currently have works nicely on Prefect 1.0 and sort of giving me what I wanted (minus the extra observability that Prefect provides). In the example you provided (I think), that flow will spawn its own cluster which is not what I am trying to do. As for Orion - I like a lot of the new ideas there but I am not sure whether I will be able to sell it to the team - especially since right now, in order to "piggyback" onto the prefect-spawned dask cluster, I would have to use async which the team here does not like so much (plus it being in beta stages). I ought to write some article at the end of my discovery process 🙂
a
Writing an article about your experience would be fantastic! 🙏 Regarding Docker storage, even if you don't override it, it won't be used when doing
flow.run()
since storage is built on registration (not at runtime) - Docker was a red herring here 🙂 Just to be sure we are on the same page: Orion client is async, but you don't have to use async tasks/flows and you don't need to know anything about async to use Orion, especially if you use your custom Python code to achieve parallelism in the flow and you don't rely on any prebuilt task runners. And for Prefect 1.0, given your use case, I wonder whether spinning up a long running standalone Dask cluster may help in your use case - this way, regardless of whether you run it locally or triggering a backend run, you may leverage the same client configuration in your task. There are some tradeoffs here, but just making sure you are aware of that option as well: https://discourse.prefect.io/t/are-there-any-guidelines-for-using-a-temporary-vs-static-dask-cluster-how-to-set-one-on-kubernetes/342/2#h-3-[…]ter-5
t
Hi, About Orion - I tried to run the same code there and got an error that I posted in previous discussion, and according to @Zanie, right now I would have to use an async task in order for it to work - and everything this entails. About long running cluster - I don't know if this would really help me here - it won't lend itself for local debugging and one of the issues we have with our current system is that tasks interfere with each other causing starvation and sometime crushes of the whole system. We are starting to migrate to k8s and I like the idea of ephemeral dask clusters... I think at the end of the day, I can probably configure a
DaskExecutor
to run locally and with a single worker and debug using it...
a
You're definitely right, if your workloads are competing for resources then using temporary cluster e.g. on Kubernetes is a smart choice. Sorry, didn't know about the async issue you mentioned, I need to catch up 😅
k
I was reading this and I think the issue might be the submission of
a_future
and
b_future
simultaneously trying to read value from different workers. Very unsure. But I think this might be more stable with processes as the scheduler? It looks like the threads are hitting each other so you’d otherwise need the lock. Really unsure
upvote 1
t
Mmmm, I believe that with
client.submit
, is creates a
Future
and not
Delayed
(thought, tbh, I am not exactly sure what the difference is 😉). I will try to look understand the SO thread more deeply and maybe I can learn from but I am not using locks explicitly. Also, I am confused why is works ok with
DaskExecutor
and not with
LocalDaskExecutor
(with and without ("synchronous")) Is there a way to configure
DaskExecutor
for a single-worker, single thread? I guess that could possible help me in debugging (in the general sense, not this issue specifically)
a
I am confused why is works ok with
DaskExecutor
and not with
LocalDaskExecutor
(with and without ("synchronous"))
LocalDaskExecutor
relies on multithreading and multiprocessing without creating a local Dask cluster. In contrast,
DaskExecutor
creates a local Dask cluster. https://discourse.prefect.io/t/what-is-the-difference-between-a-daskexecutor-and-a-localdaskexecutor/374
t
Wow, didn't expect an answer until tomorrow afternoon. Thanks for the info - I will review and see whether it somehow lead me in a good direction...
👍 1
This sort of works* for debugging:
Copy code
if __name__ == "__main__":
    print("running main")
    executor = DaskExecutor(debug=True, cluster_kwargs={"n_workers": 1})
    flow.storage = None
    flow.run(executor=executor)
Sometimes it seems to get stuck in case I am stopping / running many time with a breakpoint
a
nice! btw no need to set storage to None, storage and run_config are ignored when you do
flow.run()
👌 1
t
That's good information - thanks. Part of what I am trying to understand is how to manage the development to easily allow the researchers to be able to seamlessly move from local development and debugging to running automated, scheduled "tests" of the code
👍 1
m
Hi, I was setting up a flow and got this error. Anyone know what exit code 126 is?
z
If a command is found but is not executable, the return status is 126.
m
Could you elaborate a little please, or point me to the documentation?
Also, how do i know which command is causing this?
z
I just googled the exit code 😄
These are standard linux exit codes, not Prefect related.
What image are you using?
m
Oh I see, thanks! I am using a custom docker image that I just created.
z
To run a flow, we call a
prefect flow-run execute
in the container
m
Okay, let me try running that manually in the container.
z
You can try it locally with
docker run -t <your-image> prefect flow-run execute
👍
m
Thanks a lot!