Hi all. We think about using Prefect for ETL for o...
# ask-community
d
Hi all. We think about using Prefect for ETL for our project and I try to implement some POC. Basic idea is load documents from Elasticsearch(
detect_new_documents
), load data that should be extracted(
get_skill_extractor
). Also this data should be periodically reloaded from database if new version was added. Save results(
save_skills
). I use such flow:
Copy code
with Flow("Analyze new documents") as flow:
    documents = detect_new_documents()
    extracted_skills = extract_skills.map(documents, unmapped(get_skill_extractor()))
    save_skills.map(documents, extracted_skills)
And looks like I need some stateful worker to keep instance of skill_extractor in memory and share for all tasks, because now it is passed with tasks to Dask workers, and serializing/deserializing takes more time then processing. Maybe someone can give some advice how this can be implemented with Prefect?
k
Hi @Dmytro Kostochko, whenever working with Dask, you’re gonna have that serializing/deserializing. How big is your data size? I assume you were on Dask because it didn’t fit locally? I am a bit uninclearr what
skill_extractor
is here. Is it a function or data?
d
It is a Python class instance with some logic to find specific keywords in text. Basically many precompiled re patterns. When it is passed to worker I see this warning:
Copy code
UserWarning: Large object of size 8.59 MiB detected in task graph
My task is to parallelize CPU-bound documents analyzing. Maybe I just should use some Celery worker with and keep this
skill_extractor
in global variable and Prefect is a wrong tool
k
I see. From my understanding, the class is being sent for each operation. You can persist the class on a worker by
broadcasting
it. Dask has the
scatter
method on the Client for this and this will prevent that back and forth. From a Prefect side, you would use a resource manager , which let’s you use the Dask Client commands. You can also use the Client in a task like this:
Copy code
from dask.distributed import worker_client
@task
def calling_compute_in_a_task(filepath):
    with worker_client() as client:
        client.scatter(skill_extractor, broadcast=True)
        return
The only thing is that
skill_extractor
needs to be serializable, which I think it is from what you told me.
But yes if the frequency of the operation is more built for a task-queue type setup, then celery might be better also. Prefect is more intended for batch jobs.
d
Thank you for your answer 👍 I thought also to use scatter