https://prefect.io logo
Title
a

Avi A

04/30/2020, 1:11 PM
Hey there! I’ve started implementing our flow with prefect and it was super fun! It’s currently a basic ETL (actually EL) that reads data from Elastic to a local filesystem. The flow is basically a mapped extraction for each (input: day to extract, output: data for that day) followed by a load for each such extract (also a simple map). Now, one of the tasks is running out of memory. It read only 250k records which shouldn’t cause an OOM error. I’m using a local system (UI+ one agent) Any ideas on how to start debugging?
Just to make it clear: the task was able to read all the data and put it into a list in its memory. The error comes while running the
LocalResultHandler
also, it seems that Prefect tries to run all the extraction tasks and only then turns to running tasks from load phase. Does this mean that practically ALL data is stored in memory? if that’s the case then it’s no wonder I’m getting OOM errors
j

Jeremiah

04/30/2020, 1:29 PM
@Avi A if you’re using a
.map
operator, then at this time all data is stored in memory across children tasks. If you want to follow this issue, an enhancement to mapping is coming that will improve the handling of children tasks.
a

Avi A

04/30/2020, 1:30 PM
Thanks @Jeremiah! Any way to work around this while you fix? For example if I use some different results handler?
j

Jeremiah

04/30/2020, 1:31 PM
I believe you can work around this unless you have a reduce step after your map (which would gather all results back to the parent task). I think if you use a
DaskExecutor
to scatter the work across a Dask cluster, then you won’t run into memory errors.
a

Avi A

04/30/2020, 1:35 PM
what if I use a local Dask executor? would that help? I don’t need to spin-off a cluster at the moment and would like to differ that to a later time (to get our devops involved, and perhaps when we move to prefect cloud)
j

Jeremiah

04/30/2020, 1:36 PM
I think it might, because it would take each task, run it on the cluster (with whatever parallelism you allowed) and then kill the process - but to be completely honest I’m not totally sure. I do think it’s worth a shot.
a

Avi A

04/30/2020, 2:21 PM
thanks, I’ll try and report back
@Jeremiah how do I use the dask executor on a registered task? I see the executor is a parameter for
flow.run
but I’m using
flow.register
and then I want to run the task via the UI. Where do I specify the executor?
j

Jeremiah

04/30/2020, 4:46 PM
Hi Avi, you’ll want to attach an
Environment
object to your flow that tells Prefect how to run the flow (and which executor to use). A simple one to start with is the RemoteEnvironment because it lets you specify any executor - here are the intro docs with some example code.
👍 1
a

Avi A

05/03/2020, 6:27 AM
@Jeremiah following up: using
LocalDaskExecutor
seems to have solved the OOM issues, but the process still hangs at some point and IDK why. I ended up doing the Extraction+Load together in the same task since I wanted to add a SKIP mechanism which would’ve created an unhealthy dependence between the tasks anyway. Now I have the flow running and it’s super fun
j

Jeremiah

05/03/2020, 3:14 PM
Glad to hear it - in addition, we’ve discovered an occasional interaction between Prefect’s use of a Dask
worker_client
for mapping and certain use cases, which will be solved in the upcoming mapping refactor - what you’re describing sounds like some of the symptoms we’ve seen there.
a

Avi A

05/03/2020, 3:16 PM
You’re doing a wonderful job. Thanks you! 💪