Nivi Mukka
08/10/2021, 8:08 PMpandas.read_gbq()
or <http://pandas.to|pandas.to>_gbq()
defined within my tasks but the execution of flow seems to be using only one Dask worker instead of distributing the load and I’m getting into memory/timeout issues. I’m getting errors and warnings like this in the GKE dask worker logs:
INFO - Event loop was unresponsive in Worker for 4.40s. This is often caused by long running GIL holding functions or moving large chunks of data. This can cause timeouts and instability.
WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.40 GB -- Worker memory limit: 8.59 GB.
WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 6.89 GB -- Worker memory limit: 8.59 GB
WARNING - Worker exceeded 95% memory budget. Restarting.
/opt/conda/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown.
distributed.utils_perf - INFO - full garbage collection released 842.28 MB from 584 reference cycles (threshold: 10.00 MB)
distributed.nanny - INFO - Worker process 10 exited with status 1
Do I have to use Prefect’s BigQueryTask
class if I want the DaskExecutor
to be utilizing all of its workers and cluster options as set? or Do I have to change something in the Dask Gateway config?
How do I tell Prefect to use all the assigned Dask workers when running tasks?Kevin Kho
pandas.read_gbq()
?Nivi Mukka
08/10/2021, 8:12 PMKevin Kho
task.map(xyz)
inside the Flow?Nivi Mukka
08/10/2021, 8:13 PMimport pandas as pd
df = pd.read_gbq(query='select * from {db}.{table}'.format(db=test_db, table=output_file_sr), project_id=gcp_project_id, credentials=credentials)
Nivi Mukka
08/10/2021, 8:13 PMtask.map(xyz)
Kevin Kho
Nivi Mukka
08/10/2021, 8:21 PMKevin Kho
Nivi Mukka
08/10/2021, 8:28 PMread_gbq()
code snippet I shared above is part of a task. How can I change that to use mapping?Kevin Kho
df = pd.read_gbq(query='select * from {db}.{table}'.format(db=test_db, table=output_file_sr), project_id=gcp_project_id, credentials=credentials)
. You need to batch this up. Select by date maybe and you create one Pandas df per date. And then process each date separately?Nivi Mukka
08/10/2021, 8:33 PMKevin Kho
Nivi Mukka
08/10/2021, 8:41 PMNivi Mukka
08/10/2021, 8:46 PMtimeout
limits in the Dask options instead? So it doesnt end up with KilledWorker
due to Lazarus: https://docs.prefect.io/orchestration/concepts/services.html#lazarusKevin Kho
Nivi Mukka
08/11/2021, 3:31 AM