https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Tony Piazza

09/29/2022, 6:19 PM
Prefect appears to be trying to launch my Dask cluster 3 times when I run my flow:
Copy code
2022-09-29 11:50:53 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:50:53 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:50:54 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
2022-09-29 11:50:54 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
2022-09-29 11:50:59 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:50:59 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:50:59 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
2022-09-29 11:50:59 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
2022-09-29 11:51:05 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:51:05 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:51:06 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
2022-09-29 11:51:06 - coiled - INFO - Scheduler: ready                               Workers: 3 ready (of 3)
The simple flow is decorated as follows:
Copy code
@flow(
    description = 'Calculates revenues for wells associated with a specific forecast',
    task_runner = CoiledTaskRunner(config_name=WELL_REVENUE_CONFIG)
)
def forecast_well_revenues(request: ForecastJobRequest) -> None:
    # invoke tasks here
Can someone explain why this is happening?
z

Zanie

09/29/2022, 6:23 PM
cc @Andrew Huang
a

Andrew Huang

09/29/2022, 6:26 PM
Hi Tony, can you try running this to see if it launches your cluster three times? Also how is CoiledTaskRunner implemented?
Copy code
import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=CoiledTaskRunner())
def count_to(highest_number):
    shout.submit(highest_number)

if __name__ == "__main__":
    count_to(10)
t

Tony Piazza

09/29/2022, 6:30 PM
@Andrew Huang
Copy code
class CoiledTaskRunner(DaskTaskRunner):
    def __init__(self, config_name: str):
        cluster_config = get_cluster_config(config_name=config_name)
        cluster = coiled.Cluster(**cluster_config)

        super().__init__(
            address=cluster.scheduler_address, 
            client_kwargs={'security': cluster.security}
        )
a

Andrew Huang

09/29/2022, 6:35 PM
Does the code snippet above launch your cluster once?
t

Tony Piazza

09/29/2022, 6:50 PM
testing it now
👀 1
having to run my code on WSL...... and it crashed.
👀 1
@Andrew Huang to answer your question, the code you shared only launches the cluster once.
a

Andrew Huang

09/29/2022, 7:15 PM
What if you submit it two times; does it launch the cluster twice?
Copy code
import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=CoiledTaskRunner())
def count_to(highest_number):
    shout.submit(highest_number)
    shout.submit(highest_number)

if __name__ == "__main__":
    count_to(10)
t

Tony Piazza

09/29/2022, 7:17 PM
let me try that.
a

Andrew Huang

09/29/2022, 7:19 PM
thanks!
t

Tony Piazza

09/29/2022, 7:29 PM
same thing. only started the cluster once.
a

Andrew Huang

09/29/2022, 7:31 PM
okay so that rules out the something going awry in the implementation of CoiledTaskRunner. can you provide more details about the tasks here:
Copy code
def forecast_well_revenues(request: ForecastJobRequest) -> None:
    # invoke tasks here
actually one other thing to try before moving on to that is trying with the shout example:
Copy code
@flow(task_runner=CoiledTaskRunner(config_name=config_name))
t

Tony Piazza

09/29/2022, 8:44 PM
@Andrew Huang that last snippet is actually what I tried before. I have to point to the Coiled software environment we built, which includes Prefect.
👀 1
as far as the flow is concerned, it just calls a single task that executes some simple Dask code.
a

Andrew Huang

09/29/2022, 8:47 PM
I see; perhaps the cluster is getting restarted due to memory issues
🤔 1
👀 1
t

Tony Piazza

09/29/2022, 9:58 PM
was talking with the folks at Coiled, who are excellent BTW, and they suggested using the worker_client context manager. unfortunately, that didn't work either.
🙌 1
using the worker_client causes this:
Copy code
File "py-compute-poc/venv/lib/python3.10/site-packages/distributed/client.py", line 1022, in current
    raise ValueError("Not running inside the `as_current` context manager")
ValueError: Not running inside the `as_current` context manager
a

Andrew Huang

09/29/2022, 10:02 PM
How are you using worker client, is it like:
with worker_client(separate_thread=False) as client:
t

Tony Piazza

09/29/2022, 10:04 PM
Copy code
with worker_client() as client:
    df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
    client.persist(df)

    result = df.groupby("name").aggregate({"x": "sum", "y": "max"})

    return client.compute(result)
also tried this:
Copy code
with worker_client(separate_thread=False) as client:
    df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
    client.persist(df)

    result = df.groupby("name").aggregate({"x": "sum", "y": "max"})

    return client.compute(result)
they did tell me to pass separate_thread=False.
i tried both ways. got different failures.
a

Andrew Huang

09/29/2022, 10:14 PM
are you running this inside a task or flow; I think the worker client should be run inside a task
t

Tony Piazza

09/30/2022, 11:34 AM
@Andrew Huang I am running it inside a task
👀 1
a

Andrew Huang

09/30/2022, 4:20 PM
Are you able to run worker_client with the vanilla DaskTaskRunner?