Derek
08/27/2024, 4:25 PMdask.compute
calls without using a DaskTaskRunner? We’re just using dask ddfs for lazy evaluation of large databases and aren’t actually spinning up a dask server. When I evaluate the df, it runs a flow on each dask partition. However, the last 3 always fail with
Crash detected! Execution was cancelled by the runtime environment
I’m not getting additional error messaging that might be helpful for this to debug. But wondering if it’s a limitation with of calling dask.compute
without using DaskTaskRunnerNate
08/27/2024, 4:37 PMIs it possible makecalls without using a DaskTaskRunner?dask.compute
In [2]: import dask
...: import dask.dataframe as dd
...: import pandas as pd
...:
...: from prefect import flow
...:
...: def extract_data():
...: data = {'id': [1, 2, 3, 4, 5], 'value': [10, 20, 30, 40, 50]}
...: return dd.from_pandas(pd.DataFrame(data), npartitions=1)
...:
...: def transform_data(df):
...: df['value'] = df['value'] * 2
...: return df
...:
...: def load_data(df):
...: return df.compute()
...:
...: @flow
...: def etl_pipeline():
...: return load_data(transform_data(extract_data()))
...:
...: if __name__ == "__main__":
...: final_df = etl_pipeline()
...: print(final_df)
...:
11:36:49.052 | INFO | prefect.engine - Created flow run 'tuscan-rhino' for flow 'etl-pipeline'
11:36:49.052 | INFO | prefect.engine - View at <http://0.0.0.0:4200/runs/flow-run/c04bd322-e440-4af2-8a58-82ce9927f09b>
11:36:49.122 | INFO | Flow run 'tuscan-rhino' - Finished in state Completed()
id value
0 1 20
1 2 40
2 3 60
3 4 80
4 5 100
you should be able to just install dask[distributed]
directly and use it in flows if you wantNate
08/27/2024, 4:40 PMDerek
08/27/2024, 4:43 PM.compute()
is called, that’s when the flow is executing on the partitionDerek
08/27/2024, 4:44 PMDerek
08/27/2024, 4:49 PMdd.compute()
(also, any tips on how to root cause this error message?)Nate
08/27/2024, 4:50 PMDerek
08/27/2024, 4:53 PMNate
08/27/2024, 4:53 PMWe’re hoping to upgrade to 3 soon but need to upgrade some dependencies (specifically Pydantic)makes sense! oh sorry, cause of error
Nate
08/27/2024, 4:54 PMDerek
08/27/2024, 4:55 PMNate
08/27/2024, 4:55 PMDerek
08/27/2024, 5:00 PM16:58:47.349 | INFO | Task run 'predict_raw_output_task-6' - Finished in state Completed()
16:58:47.373 | INFO | Task run 'predict_raw_output_task-0' - Finished in state Completed()
16:58:47.415 | INFO | Task run 'predict_raw_output_task-4' - Finished in state Completed()
16:58:47.428 | INFO | Flow run 'quartz-raptor' - Finished in state Completed()
16:58:47.435 | ERROR | Task run 'predict_raw_output_task-3' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.437 | ERROR | Task run 'predict_raw_output_task-9' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.439 | ERROR | Task run 'predict_raw_output_task-8' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.464 | ERROR | Flow run 'enlightened-stingray' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.469 | ERROR | Flow run 'neon-kittiwake' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.470 | ERROR | Flow run 'dancing-wildebeest' - Crash detected! Execution was cancelled by the runtime environment.
16:58:47.574 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.368 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.369 | WARNING | api_utils - ARROW_CACHE_ENABLED is not a allowlisted env var for service type engine.
16:58:48.616 | INFO | prefect.engine - Created flow run 'grumpy-stingray' for flow 'Preview Prompt LF Flow'
16:58:48.912 | INFO | Flow run 'grumpy-stingray' - Created task run 'predict_raw_output_task-0' for task 'predict_raw_output_task'
16:58:48.913 | INFO | Flow run 'grumpy-stingray' - Submitted task run 'predict_raw_output_task-0' for execution.
I’ve observed that it’s consistently the last 3 flows (I have 14 partitions so 14 flows running) that are failing due to thisDerek
08/27/2024, 5:01 PMNate
08/27/2024, 5:02 PMDerek
08/27/2024, 5:04 PMNate
08/27/2024, 5:09 PM__call__
ed tasks on the main thread, but `submit`ed or `map`ed tasks in threads via ThreadPoolExecutor
under the hood (with the default task runner anyways)Derek
08/27/2024, 5:30 PMdd.compute()
call on a ddf partition is not supported?Nate
08/27/2024, 5:32 PMfor now is it safe to assume that running a flow that is triggered by asorry, im not sure off the top if you have an MRE id be happy to take a lookcall on a ddf partition is not supported?dd.compute()