https://prefect.io logo
Title
l

Luis Muniz

07/21/2020, 10:04 AM
Hi guys, we are trying to iterate a cursor of a long running query, chunk the results, and dynamically spawn a task that will handle this chunk. The examples I can find now in the documentation use a declarative approach, either by defining the task graph inside the Flow DSL, and using the @task decorator, or using an explicit Task instance, but here too inside the Flow itself. What we would like to have, inside a Task, that is handling a scrollable result set, is to be able to spawn dynamically a undefined number of tasks, because the total size of the result set is unknown. I hope I have been able to frame our use case properly. Here a bit of pseudo-code to illustrate it a little:
@task 
def get_data(chunk_size): 
    #fetch connection
    curr = connection.cursor()
    curr.execute(sql.SQL("select id from big_table"))
    collection = curr.fetch_many(chunk_size))
    while (!collection.is_empty()):
       #spawn task(collection) <----- *Here spwan a task*
       collection = curr.fetch_many(chunk_size))
z

Zachary Hughes

07/21/2020, 12:44 PM
Hi @Luis Muniz! If you don't need dynamic tasks specifically, I'd recommend looping for this scenario. Having one looped task would allow you to share one cursor and iterate as long as you'd like. If you're looking specifically to work with dynamic tasks, the approach I'd recommend is creating a task that loads the number of expected chunks you'll be processing, then to map over those chunks. https://docs.prefect.io/core/advanced_tutorials/task-looping.html#dynamic-dags-task-looping
l

Luis Muniz

07/21/2020, 1:36 PM
i see
to do that we would need to fetch the count of the items our query is going to serve, and create a fake collection, this looks too risky, because we may miss rows
So there is no way to programmatically spawn tasks, basically do what the flow does in map() , but from a task?
and of course one of the main reasons of using prefect is to be able to parallelize these things. If we have to load a resultset of millions of rows into memory, what good is all this setup of dask and kubernetes and doing dags? Just put everything in a script and scale vertically until you can't anymore
So i would say, yes, we would like to be able to use dynamic tasks, that's one of the big selling points of prefect
ah but wait, the link you provide actually gives us a possibility to use the LOOP signal, to spawn tasks
j

Jeremiah

07/21/2020, 1:51 PM
@Luis Muniz you’ll need to design your flow to take advantage of the semantic you prefer. If you want to have dynamic tasks, then you’ll need to create a task that returns all the items to map over. Note that returning a
(start_index, end_index)
would be sufficient for the next task to load and process the required data; you don’t have to load everything into memory in the first task. On the other hand, if you want a transactional semantic and a shared cursor, you must use a single task as there’s no way to enforce that across multiple dynamic tasks. The key thing about
map()
is that you’re telling the flow to expect the map step in advance; we don’t support this from inside a task because the flow has already been compiled and submitted to Dask for execution.
LOOP
was introduced specifically for the case where you are iterating over a cursor in sequence, and mapping is not useful (because you can’t take advantage of parallelism).
l

Luis Muniz

07/21/2020, 2:16 PM
thanks for the clarification,
start_index
is usable only if the resultset is ordered.
this already gives me something to think about, and to see if the performance is worth the complexity tradeoff
But if we want to do it properly, it looks like implementing flatmap