Hi Team, I have Prefect cloud setup on GKE cluster...
# ask-community
n
Hi Team, I have Prefect cloud setup on GKE cluster with Dask Gateway. I would like to leverage all the power of Dask (I’m assigning 12-20 workers, each worker is set to have 16GB memory) when reading/writing data to/from BigQuery. I have
pandas.read_gbq()
or
<http://pandas.to|pandas.to>_gbq()
defined within my tasks but the execution of flow seems to be using only one Dask worker instead of distributing the load and I’m getting into memory/timeout issues. I’m getting errors and warnings like this in the GKE dask worker logs:
Copy code
INFO - Event loop was unresponsive in Worker for 4.40s. This is often caused by long running GIL holding functions or moving large chunks of data. This can cause timeouts and instability. 

WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.40 GB -- Worker memory limit: 8.59 GB.

WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 6.89 GB -- Worker memory limit: 8.59 GB

WARNING - Worker exceeded 95% memory budget. Restarting.
/opt/conda/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown.

distributed.utils_perf - INFO - full garbage collection released 842.28 MB from 584 reference cycles (threshold: 10.00 MB)
distributed.nanny - INFO - Worker process 10 exited with status 1
Do I have to use Prefect’s
BigQueryTask
class if I want the
DaskExecutor
to be utilizing all of its workers and cluster options as set? or Do I have to change something in the Dask Gateway config? How do I tell Prefect to use all the assigned Dask workers when running tasks?
k
Hey @Nivi Mukka, are you mapping
pandas.read_gbq()
?
n
Not sure if I am. Can you show me an example of what mapping looks like?
k
Like
task.map(xyz)
inside the Flow?
n
This is what the code snippet inside my task looks like:
Copy code
import pandas as pd

df = pd.read_gbq(query='select * from {db}.{table}'.format(db=test_db, table=output_file_sr),                                 project_id=gcp_project_id,                                   credentials=credentials)
No. Not doing
task.map(xyz)
k
Gotcha so I think the issue here is that Pandas is single core by default and just by turning on a Dask Executor, I don’t think that pandas code will take advantage of all resources. The Dask cluster has 12-20 workers but those are different machines machine the Pandas operation can only live in one machine so you need to split out the query in a way that can be parallelized to hold it in multiple pandas DataFrames on different machines. You can try doing it with the Prefect mapping
n
Do you think this Task will achieve what I want ? https://docs.prefect.io/api/latest/tasks/gcp.html#bigquerytask
k
Not really because it returns a Pandas DataFrame so you are confined to one core still. You have to condense that query somehow or run multiple queries.
👍 1
n
The
read_gbq()
code snippet I shared above is part of a task. How can I change that to use mapping?
k
I think with the query:
df = pd.read_gbq(query='select * from {db}.{table}'.format(db=test_db, table=output_file_sr),                                 project_id=gcp_project_id, credentials=credentials)
. You need to batch this up. Select by date maybe and you create one Pandas df per date. And then process each date separately?
n
I am already using OFFSET and LIMIT to read only 100k rows at a time and this is running inside a loop.
k
Map the offsets, and then you can simultaneously perform the queries across the Dask cluster. Take the OFFSET as a parameter, and then you can map across different values of the OFFSETs. I just don’t know how many concurrent connections to GBQ you can have (but I think it should work)
n
I see, okay. Will try that out!
Could I try changing the
timeout
limits in the Dask options instead? So it doesnt end up with
KilledWorker
due to Lazarus: https://docs.prefect.io/orchestration/concepts/services.html#lazarus
k
You could do that, but it ultimately doesn’t address the issue that a Pandas based approach doesn’t inherently take advantage of the Dask cluster. The KilledWorker is likely an OOM issue in this case?
n
Understood. Yes, it is an OOM issue which is leading to timeouts as well.