Luis Muniz
07/21/2020, 10:04 AM@task
def get_data(chunk_size):
#fetch connection
curr = connection.cursor()
curr.execute(sql.SQL("select id from big_table"))
collection = curr.fetch_many(chunk_size))
while (!collection.is_empty()):
#spawn task(collection) <----- *Here spwan a task*
collection = curr.fetch_many(chunk_size))
Zachary Hughes
07/21/2020, 12:44 PMLuis Muniz
07/21/2020, 1:36 PMJeremiah
07/21/2020, 1:51 PM(start_index, end_index)
would be sufficient for the next task to load and process the required data; you don’t have to load everything into memory in the first task.
On the other hand, if you want a transactional semantic and a shared cursor, you must use a single task as there’s no way to enforce that across multiple dynamic tasks.
The key thing about map()
is that you’re telling the flow to expect the map step in advance; we don’t support this from inside a task because the flow has already been compiled and submitted to Dask for execution.
LOOP
was introduced specifically for the case where you are iterating over a cursor in sequence, and mapping is not useful (because you can’t take advantage of parallelism).Luis Muniz
07/21/2020, 2:16 PMstart_index
is usable only if the resultset is ordered.