Tony Piazza
09/29/2022, 6:19 PM2022-09-29 11:50:53 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:50:53 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:50:54 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
2022-09-29 11:50:54 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
2022-09-29 11:50:59 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:50:59 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:50:59 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
2022-09-29 11:50:59 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
2022-09-29 11:51:05 - coiled - INFO - Using existing cluster: 'ghg-demo (id: 83299)'
2022-09-29 11:51:05 - coiled - INFO - Creating Cluster (name: ghg-demo, <https://cloud.coiled.io/abcd1234/clusters/83299/details> ). This might take a few minutes...
2022-09-29 11:51:06 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
2022-09-29 11:51:06 - coiled - INFO - Scheduler: ready Workers: 3 ready (of 3)
The simple flow is decorated as follows:
@flow(
description = 'Calculates revenues for wells associated with a specific forecast',
task_runner = CoiledTaskRunner(config_name=WELL_REVENUE_CONFIG)
)
def forecast_well_revenues(request: ForecastJobRequest) -> None:
# invoke tasks here
Can someone explain why this is happening?Zanie
09/29/2022, 6:23 PMAndrew Huang
09/29/2022, 6:26 PMimport time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=CoiledTaskRunner())
def count_to(highest_number):
shout.submit(highest_number)
if __name__ == "__main__":
count_to(10)
Tony Piazza
09/29/2022, 6:30 PMclass CoiledTaskRunner(DaskTaskRunner):
def __init__(self, config_name: str):
cluster_config = get_cluster_config(config_name=config_name)
cluster = coiled.Cluster(**cluster_config)
super().__init__(
address=cluster.scheduler_address,
client_kwargs={'security': cluster.security}
)
Andrew Huang
09/29/2022, 6:35 PMTony Piazza
09/29/2022, 6:50 PMAndrew Huang
09/29/2022, 7:15 PMimport time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=CoiledTaskRunner())
def count_to(highest_number):
shout.submit(highest_number)
shout.submit(highest_number)
if __name__ == "__main__":
count_to(10)
Tony Piazza
09/29/2022, 7:17 PMAndrew Huang
09/29/2022, 7:19 PMTony Piazza
09/29/2022, 7:29 PMAndrew Huang
09/29/2022, 7:31 PMdef forecast_well_revenues(request: ForecastJobRequest) -> None:
# invoke tasks here
@flow(task_runner=CoiledTaskRunner(config_name=config_name))
Tony Piazza
09/29/2022, 8:44 PMAndrew Huang
09/29/2022, 8:47 PMTony Piazza
09/29/2022, 9:58 PMFile "py-compute-poc/venv/lib/python3.10/site-packages/distributed/client.py", line 1022, in current
raise ValueError("Not running inside the `as_current` context manager")
ValueError: Not running inside the `as_current` context manager
Andrew Huang
09/29/2022, 10:02 PMwith worker_client(separate_thread=False) as client:
Tony Piazza
09/29/2022, 10:04 PMwith worker_client() as client:
df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
client.persist(df)
result = df.groupby("name").aggregate({"x": "sum", "y": "max"})
return client.compute(result)
with worker_client(separate_thread=False) as client:
df = dask.datasets.timeseries("2000", "2005", partition_freq="2w")
client.persist(df)
result = df.groupby("name").aggregate({"x": "sum", "y": "max"})
return client.compute(result)
Andrew Huang
09/29/2022, 10:14 PMTony Piazza
09/30/2022, 11:34 AMAndrew Huang
09/30/2022, 4:20 PM