https://prefect.io logo
h

Hans Lellelid

07/14/2023, 8:02 PM
Hi folks -- I have a "Best Practices" question. If this is better put onto the discus forum, I can do that; just let me know. Basically, what I'm trying to do is take advantage of parallelization of tasks to optimize performance of parsing some very large files. ā€¢ We have a bunch of 8GB files of JSONL data in S3 ā€¢ We have a flow that takes a glob pattern and pulls down all the files and calls a task (
process_file.submit(path)
) to process each file. My understanding is that this means that agents can do that part in parallel for however many files match the glob pattern. ā€¢ What I would like to do is also parallelize the processing of batches of rows within these files, but attempting to have the
process_file
task call another task using
process_batch.submit()
results in an error about tasks not being callable outside flows. So, questions: 1. What's the best way to architect this [ideally without reinventing the worker/concurrency]? 2. Are we pushing Prefect outside of what it's envisioned by implementing our pipeline directly in flows/tasks like this?
d

Dominic Tarro

07/14/2023, 8:15 PM
You can try an orchestrator/worker pattern. It requires two flows: 1. The flow that globs all of the file paths and then sends them out to be processed in parallel. (orchestrator) 2. The flow that processes each file. (worker)
Copy code
import asyncio

from prefect import flow
from prefect.deployments.deployments import run_deployment


@flow
def worker_flow(path):
    """do stuff to individual file"""
    # Load the data from the path
    data = ...
    # Do your individual file processing
    ...


@flow
async def orchestrator_flow():
    """submit workers for each path globbed"""
    paths = ...
    results = await asyncio.gather(
        *[run_deployment("worker_flow/deployment-name", parameters=dict(path=path)) for path in paths
    )
It requires you to create a deployment for the worker.
h

Hans Lellelid

07/14/2023, 8:17 PM
Interesting, ok! Thank you. I was continuing to poke around and stumbled on the obvious-in-retrospect idea of using a subflow. I was wondering if subflow map would be another solution for this? Or if this model of calling in to a worker is better/different.
d

Dominic Tarro

07/14/2023, 8:21 PM
That's entirely valid as well. It depends on how much memory your infrastructure has. If you process 8 files concurrently expect to need at minimum 64GB of available memory. Workarounds would include 1. Loading the file to disk and then processing the file as you iterate over the lines, 2. Stream N lines from the S3 object at a time. 3. Applying a semaphore that limits the number of subflows at a given moment.
h

Hans Lellelid

07/14/2023, 8:21 PM
ok, gotcha; that's helpful. Yes, I'm streaming these files from s3 into temp files.
d

Dominic Tarro

07/14/2023, 8:21 PM
Are you using
prefect_aws.ECSTask
for your infra?
h

Hans Lellelid

07/14/2023, 8:21 PM
I'm actually using Minio in local infrastructure. I was just saying S3 for simplicity. Maybe that was a bad idea šŸ™‚
So I'm processing the files in chunks to avoid needing to load them into memory. I just wanted to support processing multiple files concurrently and then processing the batches of records within those files concurrently. (Thinking 10k-record batches, but unsure if prefect will get angry about data of that size being shuttled around between agents?)
d

Dominic Tarro

07/14/2023, 8:24 PM
Both solutions work to achieve that objective. Why do you mean by "shuttled around between agents?"
h

Hans Lellelid

07/14/2023, 8:24 PM
So, when I invoke a task I assume that prefect is serializing the parameters [the batch of records, in this case] to send it wherever that is running?
[or are flows always executed on a single agent?]
I'm using DaskTaskExecutor, but not particularly wedded to that.
d

Dominic Tarro

07/14/2023, 8:25 PM
I haven't used the DaskTaskExecutor, but the data would definitely need to be serialized to be passed around to various processes.
h

Hans Lellelid

07/14/2023, 8:26 PM
Yes, right; ok, that makes sense. We'll give this a shot and see how it performs on a smaller dataset.
d

Dominic Tarro

07/14/2023, 8:26 PM
Unless there's some sort of shared memory going on, haven't touched Dask so idk
h

Hans Lellelid

07/14/2023, 8:26 PM
Yeah, ok. No problem -- I assumed there was serialization. We're not obsessing over that part yet. If it's too slow, we'll revisit the architecture.
šŸ‘ 1
Thanks for the tips!
d

Dominic Tarro

07/14/2023, 8:27 PM
No problem, best of luck
gratitude thank you 1