Tomer Cagan
02/25/2022, 8:57 AMAnna Geller
02/25/2022, 9:59 AMKubeCluster
, check out this thread: https://discourse.prefect.io/t/are-there-any-guidelines-for-using-a-temporary-vs-static-dask-cluster-how-to-set-one-on-kubernetes/342
LMK if you still have any questions about that, happy to help more.Kevin Kho
02/25/2022, 2:34 PMTomer Cagan
02/27/2022, 8:15 AMAnna Geller
02/27/2022, 4:00 PM@task
def compute_describe(df):
with worker_client():
return df.describe().compute()
vs:
# use ResourceManager object
with DaskCluster(
cluster_type=cluster_type,
n_workers=n_workers,
software=software,
account=account,
name=name
) as client:
push_events = get_github_data(filenames)
df = to_dataframe(push_events)
to_parquet(df)
Tomer Cagan
02/28/2022, 10:27 AMAnna Geller
02/28/2022, 11:49 AMTomer Cagan
03/01/2022, 6:42 AMAnna Geller
03/01/2022, 10:53 AMTomer Cagan
03/07/2022, 8:05 AMworker_client
with Orion and I am getting an error - not sure whether it should even work.
Here is what I tried:
from dask.distributed import Client, worker_client
from prefect import task, flow
@task
def my_task(*args, **kwargs):
with worker_client() as client:
fut = client.submit(add, 30)
return fut.result()
@flow(name="My Example Flow",
ask_runner=DaskTaskRunner())
def my_flow(*args, **kwargs):
# run parallel tasks and subflows with Dask
return my_task()
Then, running this (all in ipython), I am getting this output:
In [37]: state = my_flow()
10:01:41.229 | INFO | prefect.engine - Created flow run 'cherubic-bloodhound' for flow 'My Example Flow'
10:01:41.229 | INFO | Flow run 'cherubic-bloodhound' - Using task runner 'DaskTaskRunner'
10:01:41.230 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
10:01:42.100 | INFO | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
10:01:42.174 | INFO | Flow run 'cherubic-bloodhound' - Created task run 'my_task-ec9685da-0' for task 'my_task'
10:01:44.861 | ERROR | Task run 'my_task-ec9685da-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/engine.py", line 703, in orchestrate_task_run
result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 51, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread
return await future
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run
result = context.run(func, *args)
File "<ipython-input-35-75f9eaa5372f>", line 4, in my_task
File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 119, in __enter__
return next(self.gen)
File "/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/distributed/worker_client.py", line 55, in worker_client
duration = time() - thread_state.start_time
AttributeError: '_thread._local' object has no attribute 'start_time'
/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/client.py:1227: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
10:01:44.901 | ERROR | Task run 'my_task-ec9685da-0' - Finished in state Failed('Task run encountered an exception.')
/Users/tomercagan/dev/orion/venv/lib/python3.9/site-packages/prefect/client.py:1227: UserWarning: No default storage has been set on the server. Using temporary local storage for results.
warnings.warn(
10:01:45.354 | ERROR | Flow run 'cherubic-bloodhound' - Finished in state Failed('1/1 states failed.')
Am I doing something wrong, or this is not possible in orion?Kevin Kho
03/07/2022, 2:02 PMTomer Cagan
03/07/2022, 2:24 PMKevin Kho
03/07/2022, 3:15 PMZanie
03/07/2022, 3:32 PMTomer Cagan
03/08/2022, 7:21 AMZanie
03/08/2022, 4:53 PM