Hey all, I was wondering if anyone had experience ...
# ask-community
s
Hey all, I was wondering if anyone had experience with the
local_agent
slowly leaking memory? Over a day or two the memory usage of our local agent service will grow until its out and then we run into exceptions when new flows try to be scheduled. It might not be a memory error, but our 3GB memory cap on the service seems to be not be sufficient.
k
Hey @Samuel Hinton, could it be because of persisting results? Do you have any Flows where results are not configured?
s
It might be as Im not too sure what you mean not configured
k
If you have something like this code snippet snippet below. The result of the
abc
task will be persisted because the default
Result
is a
LocalResult
. Saving the intermediate results is necessary so that a Flow can be restarted from a failed point.
Copy code
@task
def abc(x)
    return x

with Flow("test") as flow:
    abc(1)
s
Ahhh interesting. Is there a way such that once a flow is completed or a certain amount of time has passed the results are cleared from memory?
k
Prefect doesn’t delete stuff but what some people do is always overwrite the latest one so you only have one set like:
Copy code
@task(target="xxx.json")
def abc(x)
    return x
s
Curious. We’re trying to have stateless services as much as possible (without a filesystem accessible as much as possible). It seems like a common use case right, the default behaviour of an agent shouldnt be to eventually run out of memory unless that trick is employed (or I might still be missing something)
k
I know what you mean. The flip side if that the default behavior supports people retrying from failure though. You can persist the results somewhere like S3. You can also explicitly turn this off with:
Copy code
@task(checkpoint=False)
def abc(x)
    return x
on each task. This won’t write anything at all.
s
The checkpointing is useful for retrying failed tasks. Would I be able to make a feature request for behaviour that allows for checkpointing, but clearing those results if the Flow as a whole succeeds. In the meantime, Ill go through and turn off checkpointing for all of our tasks and hope we dont have too many failures haha
k
I don’t think Prefect is in the business of deleting stuff in general, but I’ll take note of it.
s
What is the recommended practise for local agents then to get around it? Setting up an s3 bucket Im guessing?
k
Turning checkpointing off or S3 bucket yep.
s
Cheers. Ill see if I can hook up a bucket instead then 🙂 Thanks for the help
👍 1
Sorry mate, another question from me - we have a few tasks and rather than declaring
@task(… result=S3Result(bucket_name))
mutliple times, is there a pass-through method attached to a flow that will set the result location for all tasks in the flow? Just trying to reduce code dup as much as possible. If not, no worries, Ill make a global S3Result object and pass it around
k
yes that’s right you can do that
with Flow("…", result=S3Result()) as flow:
s
Ah, perfect, thanks!
k
Hey guys I think I have the same problem, can you point me to the guide to use
S3Result
?
s
I ended up just turning off checkpointing, but if you do get the
S3Result
working pelase share the code. I was simply going to set
flow.result = S3Result(…)
and see if that worked 🙂
k
Haha, alright. Will let you know
k
That should work, or using
@task(result=…)
k
Hmm, but what about authentication @Kevin Kho?
k
Oh for S3Result see AWS credentials here
👍 1
k
@Kevin Kho assuming I have to
pip install prefect[aws]
?
k
Yes that will include boto3
k
Thanks @Kevin Kho
Hey I use
AzureResult
and still face the memory leak issue
Copy code
2021-08-19T18:22:52.559235+00:00 heroku[worker.1]: Process running mem=832M(162.6%)
2021-08-19T18:22:52.571871+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:22:59.887172+00:00 app[worker.1]: [2021-08-19 18:22:59+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:04.488736+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/103ff5d5-aac1-4595-9d06-ed9819ce6465.prefect_result.
2021-08-19T18:23:04.501564+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:23:04.682508+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:23:04.788498+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:23:05.011633+00:00 app[worker.1]: [2021-08-19 18:23:05+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/2be57303-26f5-4ed7-9df4-6e7fe898b0ae.prefect_result...
2021-08-19T18:23:15.499663+00:00 app[worker.1]: [2021-08-19 18:23:15+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:20.057992+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/2be57303-26f5-4ed7-9df4-6e7fe898b0ae.prefect_result.
2021-08-19T18:23:20.071755+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:23:20.284908+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:23:20.372354+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:23:20.569703+00:00 heroku[worker.1]: Process running mem=806M(157.5%)
2021-08-19T18:23:20.577481+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:23:43.038443+00:00 heroku[worker.1]: Process running mem=808M(158.0%)
2021-08-19T18:23:43.040014+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:23:50.239216+00:00 app[worker.1]: [2021-08-19 18:23:50+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:50.265290+00:00 app[worker.1]: [2021-08-19 18:23:50+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/f4928631-7c4e-4e12-8269-147b374a188c.prefect_result...
2021-08-19T18:24:04.496479+00:00 heroku[worker.1]: Process running mem=1000M(195.4%)
2021-08-19T18:24:04.547135+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:24:13.287647+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/f4928631-7c4e-4e12-8269-147b374a188c.prefect_result.
2021-08-19T18:24:13.302258+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:13.524866+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:13.636673+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:13.892991+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/42d67f9b-e93b-49dc-bbfb-1efb4f26de43.prefect_result...
2021-08-19T18:24:30.532740+00:00 app[worker.1]: [2021-08-19 18:24:30+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:24:35.768330+00:00 app[worker.1]: [2021-08-19 18:24:35+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/42d67f9b-e93b-49dc-bbfb-1efb4f26de43.prefect_result.
2021-08-19T18:24:35.780349+00:00 app[worker.1]: [2021-08-19 18:24:35+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:36.229574+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:36.323417+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:36.546728+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/94525232-4ce6-4f95-a57f-a4b2a33ed390.prefect_result...
2021-08-19T18:24:39.634053+00:00 heroku[worker.1]: Process running mem=856M(167.2%)
2021-08-19T18:24:39.638760+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:24:47.850892+00:00 app[worker.1]: [2021-08-19 18:24:47+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:24:53.297335+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/94525232-4ce6-4f95-a57f-a4b2a33ed390.prefect_result.
2021-08-19T18:24:53.309001+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:53.504494+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:53.611531+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:53.811552+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/433adabd-fd64-4b98-a6b9-fef107a1185f.prefect_result...
2021-08-19T18:25:02.809571+00:00 heroku[worker.1]: Process running mem=952M(186.1%)
2021-08-19T18:25:02.823562+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:25:09.403863+00:00 app[worker.1]: [2021-08-19 18:25:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:25:13.931720+00:00 app[worker.1]: [2021-08-19 18:25:13+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/433adabd-fd64-4b98-a6b9-fef107a1185f.prefect_result.
2021-08-19T18:25:13.946559+00:00 app[worker.1]: [2021-08-19 18:25:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:25:14.167481+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:25:14.256477+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:25:14.510231+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/36ba704f-a890-4ecd-90d8-0db1b1ab41bd.prefect_result...
2021-08-19T18:25:27.675392+00:00 heroku[worker.1]: Process running mem=1050M(205.3%)
2021-08-19T18:25:27.691129+00:00 heroku[worker.1]: Error R15 (Memory quota vastly exceeded)
2021-08-19T18:25:27.693561+00:00 heroku[worker.1]: Stopping process with SIGKILL
2021-08-19T18:25:27.894522+00:00 heroku[worker.1]: Process exited with status 137
k
Do you have a mapped task here? How are you using the result?
k
I don't use mapped task, but loop task
right now the result is not used yet
k
Will look at this more tomorrow (bumping it up for myself)
k
Hi @Kevin Kho have you taken a look at this?
k
Sorry forgot to respond. Feel free to ping me if I forgot to get back to you in a day. So in order to save memory, some people use the result interface in the class inside the task
Copy code
@task()
    def abc():
        res = AzureResult(...)
        data = ...
        res.write(data, location = "location.csv")
        del data
        gc.collect()
        return location
and then you only pass the location so this is kept in memory, and then you read it from the downstream task. You memory looks pretty low for data work though? I am seeing 1GB max? Not sure how much this “result inside the task” will help you.
k
Hey @Kevin Kho thanks for your response. Yeah I'm not actively working on this right now, and it's actually just a hobby, so I'm using as much free infrastructure as I can. Yeah agree it's a bit low on the memory. How about reading the result from the returned location?
k
It’s a bit similar, It would be
res.read(location)
I think