https://prefect.io logo
z

Zachary Piazza

07/18/2023, 3:58 PM
Hi folks. I am trying to get started using prefect + dask but I ran into some problems right away. I am able to submit tasks to the dask cluster I created using the kubernetes operator but all the tasks fail immediately with the following exception:
Copy code
10:52:38.256 | INFO    | Task run 'print_hello-99' - Crash detected! Execution was interrupted by an unexpected exception: Exception: PrefectHTTPStatusError("Client error '404 Not Found' for url '<http://ephemeral-prefect/api/task_runs/7fa61d80-fdb2-490f-b0f6-e4d8709103b5/set_state>'\nResponse: {'exception_message': 'Task run with id 7fa61d80-fdb2-490f-b0f6-e4d8709103b5 not found'}\nFor more information check: <https://httpstatuses.com/404>")
1
The code is very basic
Copy code
import dask_kubernetes
from dask_kubernetes import make_pod_spec
from kubernetes.client import V1Pod
from prefect import flow, get_run_logger, task
from distributed import Client
from prefect_dask.task_runners import DaskTaskRunner

client = Client("localhost:9000")


@task
def print_hello(name):
    return f"Hello {name}!"

@flow(
    task_runner=DaskTaskRunner(address="localhost:9000"),
)
def hello_world():
    greetings = []
    with open('names.txt') as f:
        names = f.read().splitlines()
        for name in names:
            print_hello.submit(name)
    return greetings

if __name__ == "__main__":
    greetings = hello_world()
    print(greetings)
e

Emil Christensen

07/18/2023, 4:11 PM
@Zachary Piazza is the flow running locally? Most likely what’s happening is that the Dask workers - which are running inside the cluster - are communicating with their local Prefect backend which is different from your local Prefect backend. In other words, the task exists in you environment, but not in the Dask worker environment. The way to fix it is by having both your local and the Dask cluster pointing to the same backend. You can do that by using Prefect cloud or by hosting a server.
👀 1
z

Zachary Piazza

07/18/2023, 4:13 PM
oh thank you so much. i think i see what i need to do now. give me a second
👍 1
Okay so I used the
prefect kubernetes manifest server
command to generate the YAML spec and then port forwarded the service port (4200) to my local machine and set the URL using
prefect config set PREFECT_API_URL="<http://127.0.0.1:4200/api>"
It now says "All connection attempts failed"
e

Emil Christensen

07/18/2023, 4:20 PM
When does it say
All connection attempts failed
? Could you share your full stacktrace? The Dask workers also need to be able to access the server. I’d highly recommend starting out with Prefect cloud rather than self-hosting if possible.
z

Zachary Piazza

07/18/2023, 4:22 PM
Yeah I'm sure the cloud offering is easier but I really want to make this work with open source 🙂
I don't see a full stack trace in the logs just the following messages for each task execution
Copy code
11:15:34.213 | INFO    | Task run 'print_hello-99' - Crash detected! Request to <http://127.0.0.1:4200/api/task_runs/cdcffbd8-aace-4e4d-b80a-aba5473b355b/set_state> failed: ConnectError: All connection attempts failed.
Do I need to set the prefect URL from the dask cluster definition as well?
e

Emil Christensen

07/18/2023, 4:24 PM
Right, so the Dask worker(s) likely won’t be able to access that server since 127.0.0.1 for them is different than it is for you. You are port forwarding from the cluster to your local instance, but inside the Dask worker (think of it as a separate machine) nothing is running on 127.0.0.1
z

Zachary Piazza

07/18/2023, 4:25 PM
so whatever I said for PREFECT_API_URL on the prefect cli gets forwarded to the workers?
e

Emil Christensen

07/18/2023, 4:25 PM
You might want to look at our Helm charts for more deployment options.
so whatever I said for PREFECT_API_URL on the prefect cli gets forwarded to the workers?
Yep, that’s right… any Prefect settings get propagated.
z

Zachary Piazza

07/18/2023, 4:28 PM
Woooo I think I got it 🙂
🙌 1