Is it possible make `dask.compute` calls without u...
# ask-community
d
Is it possible make
dask.compute
calls without using a DaskTaskRunner? We’re just using dask ddfs for lazy evaluation of large databases and aren’t actually spinning up a dask server. When I evaluate the df, it runs a flow on each dask partition. However, the last 3 always fail with
Copy code
Crash detected! Execution was cancelled by the runtime environment
I’m not getting additional error messaging that might be helpful for this to debug. But wondering if it’s a limitation with of calling
dask.compute
without using DaskTaskRunner
n
hi @Derek - yes!
Is it possible make
dask.compute
calls without using a DaskTaskRunner?
Copy code
In [2]: import dask
   ...: import dask.dataframe as dd
   ...: import pandas as pd
   ...:
   ...: from prefect import flow
   ...:
   ...: def extract_data():
   ...:     data = {'id': [1, 2, 3, 4, 5], 'value': [10, 20, 30, 40, 50]}
   ...:     return dd.from_pandas(pd.DataFrame(data), npartitions=1)
   ...:
   ...: def transform_data(df):
   ...:     df['value'] = df['value'] * 2
   ...:     return df
   ...:
   ...: def load_data(df):
   ...:     return df.compute()
   ...:
   ...: @flow
   ...: def etl_pipeline():
   ...:     return load_data(transform_data(extract_data()))
   ...:
   ...: if __name__ == "__main__":
   ...:     final_df = etl_pipeline()
   ...:     print(final_df)
   ...:
11:36:49.052 | INFO    | prefect.engine - Created flow run 'tuscan-rhino' for flow 'etl-pipeline'
11:36:49.052 | INFO    | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/c04bd322-e440-4af2-8a58-82ce9927f09b>
11:36:49.122 | INFO    | Flow run 'tuscan-rhino' - Finished in state Completed()
   id  value
0   1     20
1   2     40
2   3     60
3   4     80
4   5    100
you should be able to just install
dask[distributed]
directly and use it in flows if you want
this might also be an option for you https://github.com/PrefectHQ/prefect/pull/13537
d
The issue with prefect-dask is the versioning conflict we have in our platform (it’s pinned to 2022.4.0). That’s why I need to explore alternate routes 🙂 Slight modification to the code snippet you provided: We are calling the flow on the ddf, such that when
.compute()
is called, that’s when the flow is executing on the partition
So the flow is essentially running within dask’s coroutine
Basically, we’ve set up a highly concurrent flow to speed up our network calls. But this is on a sample of our dataset that runs on pandas, and now we’re trying to replace the longer running call that runs on dask ddfs (since these datasets exceed memory capacity). I was hoping it would be as easy as just swapping out the network call portion with the new flow we’ve built during the
dd.compute()
(also, any tips on how to root cause this error message?)
n
are you using prefect 2 or 3? if you're open to it, id recommend 3 bc the engine has been rewritten for performance etc (GA next week) so CoE would be a bit different between the two
d
We’re hoping to upgrade to 3 soon but need to upgrade some dependencies (specifically Pydantic) Also, what does CoE refer to here?
n
We’re hoping to upgrade to 3 soon but need to upgrade some dependencies (specifically Pydantic)
makes sense! oh sorry, cause of error
without more of the trace / code > Crash detected! Execution was cancelled by the runtime environment its hard to know whats going on here this could be any old sigterm / oom etc
d
Got it but would there be a traceback that should bubble up?
n
yeah I think there would be
d
I’m not sure if it’s a setting that I’ve missed but I’m not getting anything
Copy code
16:58:47.349 | INFO    | Task run 'predict_raw_output_task-6' - Finished in state Completed()
16:58:47.373 | INFO    | Task run 'predict_raw_output_task-0' - Finished in state Completed()
16:58:47.415 | INFO    | Task run 'predict_raw_output_task-4' - Finished in state Completed()
16:58:47.428 | INFO    | Flow run 'quartz-raptor' - Finished in state Completed()
16:58:47.435 | ERROR   | Task run 'predict_raw_output_task-3' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.437 | ERROR   | Task run 'predict_raw_output_task-9' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.439 | ERROR   | Task run 'predict_raw_output_task-8' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.464 | ERROR   | Flow run 'enlightened-stingray' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.469 | ERROR   | Flow run 'neon-kittiwake' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.470 | ERROR   | Flow run 'dancing-wildebeest' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.574 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.368 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.369 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.616 | INFO    | prefect.engine - Created flow run 'grumpy-stingray' for flow 'Preview Prompt LF Flow'
16:58:48.912 | INFO    | Flow run 'grumpy-stingray' - Created task run 'predict_raw_output_task-0' for task 'predict_raw_output_task'
16:58:48.913 | INFO    | Flow run 'grumpy-stingray' - Submitted task run 'predict_raw_output_task-0' for execution.
I’ve observed that it’s consistently the last 3 flows (I have 14 partitions so 14 flows running) that are failing due to this
But when I switch to SequentialTaskRunner things do seem to be working fine, so could be some memory issue
n
hrm it might have to do with threading maybe? not sure yet in 2.x, we ran all tasks in worker threads, which causes weird things
d
I see, I was also under the impression that in 2.x tasks were run with anyio. Does this change in 3.0?
n
we do use anyio worker threads in 2.x yes it does change in 3.x, as we run
__call__
ed tasks on the main thread, but `submit`ed or `map`ed tasks in threads via
ThreadPoolExecutor
under the hood (with the default task runner anyways)
d
Got it. We’ll try to upgrade to 3.0 asap since there are a lot of other benefits that we want too — for now is it safe to assume that running a flow that is triggered by a
dd.compute()
call on a ddf partition is not supported?
👍 1
n
for now is it safe to assume that running a flow that is triggered by a
dd.compute()
call on a ddf partition is not supported?
sorry, im not sure off the top if you have an MRE id be happy to take a look