Dmytro Kostochko
06/10/2021, 1:34 PMdetect_new_documents
), load data that should be extracted(get_skill_extractor
). Also this data should be periodically reloaded from database if new version was added. Save results(save_skills
).
I use such flow:
with Flow("Analyze new documents") as flow:
documents = detect_new_documents()
extracted_skills = extract_skills.map(documents, unmapped(get_skill_extractor()))
save_skills.map(documents, extracted_skills)
And looks like I need some stateful worker to keep instance of skill_extractor in memory and share for all tasks, because now it is passed with tasks to Dask workers, and serializing/deserializing takes more time then processing.
Maybe someone can give some advice how this can be implemented with Prefect?Kevin Kho
skill_extractor
is here. Is it a function or data?Dmytro Kostochko
06/10/2021, 1:59 PMUserWarning: Large object of size 8.59 MiB detected in task graph
My task is to parallelize CPU-bound documents analyzing. Maybe I just should use some Celery worker with and keep this skill_extractor
in global variable and Prefect is a wrong toolKevin Kho
broadcasting
it. Dask has the scatter
method on the Client for this and this will prevent that back and forth.
From a Prefect side, you would use a resource manager , which let’s you use the Dask Client commands.
You can also use the Client in a task like this:
from dask.distributed import worker_client
@task
def calling_compute_in_a_task(filepath):
with worker_client() as client:
client.scatter(skill_extractor, broadcast=True)
return
The only thing is that skill_extractor
needs to be serializable, which I think it is from what you told me.Kevin Kho
Dmytro Kostochko
06/11/2021, 7:05 AM