Hello! I want to build a flow several components o...
# prefect-community
i
Hello! I want to build a flow several components of which need to load a heavy model before they do their work. This model takes more than two minutes to load, therefore I'd like to do some sort of stateful component that can have the model loaded in memory and when data comes to it, it just has to do the .predict(). I've seen that such thing as “stateful components” do not exist, and I shouldn't rely on a component's state because. Is there any guideline or best-practice on how to achieve this? Maybe Prefect is not intended for this and the components can't be so heavy? Thank you
m
Assuming these flow tasks use the same underlying model, I see two potential solutions: • create a predictions API to which you can send requests to compute predictions • create a task to load the model and use this task result (the loaded model) as inputs of the other tasks that require the model Would any of these work for you?
upvote 1
a
Matthias's answer is excellent, but can you explain your use case more instead of talking about features if you still have questions? If your problem is serving ML models, I'd agree that Prefect is not directly intended for this. It would be better to build a custom API or use some dedicated tool for that. But if you would want to, e.g., trigger a training process and generate your model on schedule or via an API call, and then trigger a series of actions once this model training process is finished (orchestration), then this is a very good use case for Prefect. And for statefulness, you could do something as simple as dumping your pickled model to S3.
m
If your problem is serving ML models, I'd agree that Prefect is not directly intended for this. It would be better to build a custom API or use some dedicated tool for that
Unless of course, you do batch predictions (in which case Prefect is the perfect tool to do that)
💯 1
a
@Matthias, great point! By model serving I meant a service serving real-time predictions
i
@Matthias Thank you for the suggestions. Yes, the underlying model is the same, the only bad thing is that it is very heavy as it is a OCR model of 200~300 MB, but I also plan to use even heavier models (transformers) which can be 2~4 GB. I want to do an ETL that takes a bunch of documents from a DB (or one document from an API call to some endpoint) and it does OCR to it (with a heavy OCR model), and then some predictions using another model. My plan is to have up to 3 to 5 OCR objects loaded in memory that can operate at the same time (one OCR object can't process more than one page at the same time) and the process_doc task is the one that maps the ocr.process_page() to each of the documents' pages, and the more OCR objects I can load into memory, the more parallelism I will obtain. Therefore what I want to do is a full end2end document processing pipeline that can parallelizes at document level and at page level, because the unit operation here is to do OCR and classify each of the document's pages.
Copy code
@task
def process_doc(doc):
    ocr_engine = OCREngine() # This will unnecesarily load the model every time the task is called
    for page in doc:
        print(f'Processing page `{page.page_num}` from doc `{doc.name}`')
        ocr_engine.process_page(page)
    return doc
As you can see,
ocr_engine = OCREngine()
is being instantiated inside the task. This is suboptimal because it gets loaded every time the task is called. I'd like to have a pool of
N
ocr_engines that work in parallel and the tasks are the
process_page
of each of the instantiated
ocr_engines
a
Perhaps you can check the resource manager? I can share the link to the docs and some examples
i
I will take a look, thank you!
Also, related: Is there any way to do nested loops maps? E.g.: if I have 100 docs, and each doc has 20 pages, I have to process 2000 pages. Therefore I have to either call 100 times a Flow that only processes one doc (and parallelizes the pages), or the alternative is to call only 1 time Flow that parallelizes the doc processing, but I can't furhter parallelize the pages inside that task. Is there any way of doing a .map() inside a task function?
a
Alternatively, you can flatten your list. This flat-mapping seems appropriate to your use case https://docs.prefect.io/core/concepts/mapping.html#flat-mapping
k
A two-stage map runs into problems technically because a map operation distributed the work across the available hardware so the first map already uses all the available hardware. So calling map on the already mapped processes would try to use all available hardware again for each child process from the first map. This leads into resource contention, and Dask doesn’t allow this kind of operation.
👍 1