https://prefect.io logo
b

Bertrand GERVAIS

04/21/2020, 4:41 PM
Hi, I am trying to implement a geographical data processing worflow, working on datasets that do not fit into memory, and I dont find a proper documented way to perform parallel computation on each object in my datasets. What I would like to do is not return all objects, as the result of a task, to map a function on them. I would like to read data inside a task and dispatch objects (either using a queue or some list containing a reasonnable number of objects) for parallel processing, until I have read and processed all data (+ dumped results into a DB or file, that will be handled by a later task), and I dont find a way to do so with Prefect. It seems I need to return all objects, which I cant, in order to parallelize computation on them with the map function. I have tried using the executor inside a task to call its map function, which works fine and could suit my needs. But I am not sure if this is the right way to do so, and if it can be considered as an official feature you will keep supporting. According to you, what is the best solution to handle this issue ?
z

Zachary Hughes

04/21/2020, 4:47 PM
Hi @Bertrand GERVAIS, taking a look now! Will be back to you with a recommendation ASAP.
j

Jim Crist-Harif

04/21/2020, 4:58 PM
Could you instead return a list of references to the data? For example, if you have 1000 files, you don't need to read them all and return the results, you can map over the list of file names.
upvote 1
Alternatively you might have a prefect task that schedules your computation on the underlying dask cluster (assuming you're using the dask executor). I recommend using the
dask.distributed.Client
directly, rather than relying on prefect's executor inside this task, as that interface is less public.
b

Bertrand GERVAIS

04/22/2020, 6:19 AM
I want to avoid returning a list of references to the data, as I am using a generic data reader (OGR library) and my source data can be either local files, DB or served by web services. OGR gives me a good abstraction to read this kind of data transparently, that I dont want to lose by using Prefect
Yes I plan tu use dask executor. How can I call the client inside a task ? Do I need to set up the object inside each task ?
So far, after trying Airflow and using Dask in the past, I am almost 100% convinced Prefect has a great paradigm, but it feels like a blind spot not offering a clear solution for parallel computing on very large datasets. For geographical data processing, the size of datasets is often more of an issue than the computation time on each object.
j

Jim Crist-Harif

04/22/2020, 5:22 PM
I want to avoid returning a list of references to the data, as I am using a generic data reader (OGR library) and my source data can be either local files, DB or served by web services. OGR gives me a good abstraction to read this kind of data transparently, that I dont want to lose by using Prefect
Clarifying, just to ensure we're not talking past each other. Frequently when I see workloads like this, operations can be broken down into two steps: • List references to all the objects you want to process (usually fast, doesn't require reading the data). This may be a list of files, a list of keys in the db, etc... • Do some work on each object. This task would take the reference, and use the underlying library (OGR) to read the data in and process it. If your workload fits, this, then prefect's map should work:
Copy code
@task
def items_to_process():
    return list_of_keys_to_process

@task
def process_single_item(key):
    data = read_in_data_from_key(key)
    do_some_work(data)
Of course not all data sources can support this model efficiently. Reading from a gzip file is a good example - there's no efficient way to do random access to read a single object, as the file would have to be decompressed up to that object each time. I'm not familiar with OGR, but if your workload doesn't easily fit the above, then I recommend having a single prefect task kick off a dask job (running on the same cluster) which handles the parallization you'd want.
We're likely going to make this easier in the future, but for now I believe the following should work in all situations (provided you're running prefect on a distributed dask cluster):
Copy code
from dask.distributed import get_client

@task
def your_dask_processing_step():
    client = get_client()
    # do dask things in here
    # for example:
    results = client.map(inc, [1, 2, 3, 4, 5])
Please let me know if you have any questions/run into issues. I'm hoping to expand dask integrations in Prefect over the next few months, so this should get nicer over time.
b

Bertrand GERVAIS

04/23/2020, 9:12 AM
Thanks for the additional information. I dont think listing references would suit my needs, as I basically have very few files encoded as GeoPackage, which is basically a SQLite DB, or sometimes PostgreSQL tables, and I want to split processing on a table. I am not not very keen on accessing this DB from many workers so that each on of them works on some items in the table. Do you maybe have a better solution for this ?
Thanks for the dask.distrubuted solution. I have tried that yesterday for got some issues having two dask schedulers on my computer (on for prefect, the other one for parallel computation inside tasks). I will try your solution and tell you how it works for my case. Thanks
Alright, it works fine by calling
from dask.distributed import get_client
I am very interested in future works you will be doing on this !