Luis Muniz07/21/2020, 10:04 AM
@task def get_data(chunk_size): #fetch connection curr = connection.cursor() curr.execute(sql.SQL("select id from big_table")) collection = curr.fetch_many(chunk_size)) while (!collection.is_empty()): #spawn task(collection) <----- *Here spwan a task* collection = curr.fetch_many(chunk_size))
Zachary Hughes07/21/2020, 12:44 PM
Luis Muniz07/21/2020, 1:36 PM
Jeremiah07/21/2020, 1:51 PM
would be sufficient for the next task to load and process the required data; you don’t have to load everything into memory in the first task. On the other hand, if you want a transactional semantic and a shared cursor, you must use a single task as there’s no way to enforce that across multiple dynamic tasks. The key thing about
is that you’re telling the flow to expect the map step in advance; we don’t support this from inside a task because the flow has already been compiled and submitted to Dask for execution.
was introduced specifically for the case where you are iterating over a cursor in sequence, and mapping is not useful (because you can’t take advantage of parallelism).
Luis Muniz07/21/2020, 2:16 PM
is usable only if the resultset is ordered.