https://prefect.io logo
Title
a

Avi A

08/23/2020, 11:13 AM
Hey, I’m using a
LocalEnvironment
/
DaskCluster
to run my flow on a local cluster. Where does prefect report the LocalCluster provisioning logs? I’m interested in what port the UI is served (port 8787 is already taken on the server)
ping. does anyone know how to answer this? I actually also want to be able to connect to that scheduler from inside one of the tasks and I don’t know how
j

Jenny

09/08/2020, 2:19 PM
Hi Avi - are you wanting to know the default base url for the UI when you're using Sever? It's http://localhost:8080. If that wasn't what you were asking can you give me a bit more detail? Thanks.
a

Avi A

09/08/2020, 2:21 PM
no, I want to read the attributes of the dask scheduler that spawns when the job is running. I was expecting to find it in the prefect context when the flow is running, but it’s not there
the default scheduler address is
localhost:8786
but if for example if I’m running several flows on the same agent, with each of them spawning a local
DaskCluster
, I have no idea what is the scheduler for the flow I’m currently running in. I hope this makes it a bit clearer.
j

Jenny

09/08/2020, 2:23 PM
Ah - thanks for the clarification - that makes more sense. Let me check with the team.
a

Avi A

09/08/2020, 2:23 PM
Thanks @Jenny
j

Jim Crist-Harif

09/08/2020, 2:48 PM
I actually also want to be able to connect to that scheduler from inside one of the tasks and I don’t know how
I don't recommend doing this based on the address of the dask cluster, but rather use a
worker_client
to get a client while inside a task running on a worker. Docs: https://distributed.dask.org/en/latest/api.html#distributed.worker_client
from distributed import worker_client

@task
def example():
    with worker_client() as client:
        # do your dask stuff here
a

Avi A

09/08/2020, 2:49 PM
OMG this is super cool!
🚀 1
@Jim Crist-Harif how do I make this client the default client? I’m not actually submitting a task on my own, I’m running a computation on a dask dataframe and would like the computation to be done on the existing cluster. I currently do this by:
client = Client(existing_cluster_addr) # this implicitly makes the client the default execution entry for dask operations
...

dask_dataframe.operations.compute()
so if I put Dask DataFrame operations inside the context in the code snippet you provided, will it run on the
client
from the context?
j

Jim Crist-Harif

09/08/2020, 3:02 PM
I don't recommend setting it as the default client, that will bleed outside of that single task and may affect other tasks. Rather, I recommend passing
scheduler=client
to
.compute
to specify which client to use.
a

Avi A

09/08/2020, 3:03 PM
sounds like a plan. I thought
scheduler
can only receive an address. Thanks a lot