Samuel Hinton
08/09/2021, 12:39 PMlocal_agent
slowly leaking memory? Over a day or two the memory usage of our local agent service will grow until its out and then we run into exceptions when new flows try to be scheduled. It might not be a memory error, but our 3GB memory cap on the service seems to be not be sufficient.Kevin Kho
08/09/2021, 1:26 PMSamuel Hinton
08/09/2021, 1:28 PMKevin Kho
08/09/2021, 1:31 PMabc
task will be persisted because the default Result
is a LocalResult
. Saving the intermediate results is necessary so that a Flow can be restarted from a failed point.
@task
def abc(x)
return x
with Flow("test") as flow:
abc(1)
Samuel Hinton
08/09/2021, 1:34 PMKevin Kho
08/09/2021, 1:36 PM@task(target="xxx.json")
def abc(x)
return x
Samuel Hinton
08/09/2021, 1:39 PMKevin Kho
08/09/2021, 1:59 PM@task(checkpoint=False)
def abc(x)
return x
on each task. This won’t write anything at all.Samuel Hinton
08/09/2021, 2:05 PMKevin Kho
08/09/2021, 2:10 PMSamuel Hinton
08/09/2021, 2:12 PMKevin Kho
08/09/2021, 2:15 PMSamuel Hinton
08/09/2021, 2:15 PM@task(… result=S3Result(bucket_name))
mutliple times, is there a pass-through method attached to a flow that will set the result location for all tasks in the flow? Just trying to reduce code dup as much as possible. If not, no worries, Ill make a global S3Result object and pass it aroundKevin Kho
08/09/2021, 2:23 PMwith Flow("…", result=S3Result()) as flow:
Samuel Hinton
08/09/2021, 2:25 PMKien Nguyen
08/12/2021, 9:53 AMS3Result
?Samuel Hinton
08/12/2021, 10:19 AMS3Result
working pelase share the code. I was simply going to set flow.result = S3Result(…)
and see if that worked 🙂Kien Nguyen
08/12/2021, 10:43 AMKevin Kho
08/12/2021, 2:42 PM@task(result=…)
Kien Nguyen
08/13/2021, 2:11 AMKevin Kho
08/13/2021, 2:53 AMKien Nguyen
08/13/2021, 11:24 AMpip install prefect[aws]
?Kevin Kho
08/13/2021, 1:52 PMKien Nguyen
08/13/2021, 1:58 PMAzureResult
and still face the memory leak issue2021-08-19T18:22:52.559235+00:00 heroku[worker.1]: Process running mem=832M(162.6%)
2021-08-19T18:22:52.571871+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:22:59.887172+00:00 app[worker.1]: [2021-08-19 18:22:59+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:04.488736+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/103ff5d5-aac1-4595-9d06-ed9819ce6465.prefect_result.
2021-08-19T18:23:04.501564+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:23:04.682508+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:23:04.788498+00:00 app[worker.1]: [2021-08-19 18:23:04+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:23:05.011633+00:00 app[worker.1]: [2021-08-19 18:23:05+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/2be57303-26f5-4ed7-9df4-6e7fe898b0ae.prefect_result...
2021-08-19T18:23:15.499663+00:00 app[worker.1]: [2021-08-19 18:23:15+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:20.057992+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/2be57303-26f5-4ed7-9df4-6e7fe898b0ae.prefect_result.
2021-08-19T18:23:20.071755+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:23:20.284908+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:23:20.372354+00:00 app[worker.1]: [2021-08-19 18:23:20+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:23:20.569703+00:00 heroku[worker.1]: Process running mem=806M(157.5%)
2021-08-19T18:23:20.577481+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:23:43.038443+00:00 heroku[worker.1]: Process running mem=808M(158.0%)
2021-08-19T18:23:43.040014+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:23:50.239216+00:00 app[worker.1]: [2021-08-19 18:23:50+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:23:50.265290+00:00 app[worker.1]: [2021-08-19 18:23:50+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/f4928631-7c4e-4e12-8269-147b374a188c.prefect_result...
2021-08-19T18:24:04.496479+00:00 heroku[worker.1]: Process running mem=1000M(195.4%)
2021-08-19T18:24:04.547135+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:24:13.287647+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/f4928631-7c4e-4e12-8269-147b374a188c.prefect_result.
2021-08-19T18:24:13.302258+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:13.524866+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:13.636673+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:13.892991+00:00 app[worker.1]: [2021-08-19 18:24:13+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/42d67f9b-e93b-49dc-bbfb-1efb4f26de43.prefect_result...
2021-08-19T18:24:30.532740+00:00 app[worker.1]: [2021-08-19 18:24:30+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:24:35.768330+00:00 app[worker.1]: [2021-08-19 18:24:35+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/42d67f9b-e93b-49dc-bbfb-1efb4f26de43.prefect_result.
2021-08-19T18:24:35.780349+00:00 app[worker.1]: [2021-08-19 18:24:35+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:36.229574+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:36.323417+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:36.546728+00:00 app[worker.1]: [2021-08-19 18:24:36+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/94525232-4ce6-4f95-a57f-a4b2a33ed390.prefect_result...
2021-08-19T18:24:39.634053+00:00 heroku[worker.1]: Process running mem=856M(167.2%)
2021-08-19T18:24:39.638760+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:24:47.850892+00:00 app[worker.1]: [2021-08-19 18:24:47+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:24:53.297335+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/94525232-4ce6-4f95-a57f-a4b2a33ed390.prefect_result.
2021-08-19T18:24:53.309001+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:24:53.504494+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:24:53.611531+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:24:53.811552+00:00 app[worker.1]: [2021-08-19 18:24:53+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/433adabd-fd64-4b98-a6b9-fef107a1185f.prefect_result...
2021-08-19T18:25:02.809571+00:00 heroku[worker.1]: Process running mem=952M(186.1%)
2021-08-19T18:25:02.823562+00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
2021-08-19T18:25:09.403863+00:00 app[worker.1]: [2021-08-19 18:25:09+0000] DEBUG - prefect.CloudFlowRunner | Checking flow run state...
2021-08-19T18:25:13.931720+00:00 app[worker.1]: [2021-08-19 18:25:13+0000] DEBUG - prefect.AzureResult | Finished uploading result to 2021/8/19/433adabd-fd64-4b98-a6b9-fef107a1185f.prefect_result.
2021-08-19T18:25:13.946559+00:00 app[worker.1]: [2021-08-19 18:25:13+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Running to Looped
2021-08-19T18:25:14.167481+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Handling state change from Pending to Running
2021-08-19T18:25:14.256477+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.CloudTaskRunner | Task 'extract_historical_trades': Calling task.run() method...
2021-08-19T18:25:14.510231+00:00 app[worker.1]: [2021-08-19 18:25:14+0000] DEBUG - prefect.AzureResult | Starting to upload result to 2021/8/19/36ba704f-a890-4ecd-90d8-0db1b1ab41bd.prefect_result...
2021-08-19T18:25:27.675392+00:00 heroku[worker.1]: Process running mem=1050M(205.3%)
2021-08-19T18:25:27.691129+00:00 heroku[worker.1]: Error R15 (Memory quota vastly exceeded)
2021-08-19T18:25:27.693561+00:00 heroku[worker.1]: Stopping process with SIGKILL
2021-08-19T18:25:27.894522+00:00 heroku[worker.1]: Process exited with status 137
Kevin Kho
08/20/2021, 2:29 PMKien Nguyen
08/21/2021, 2:54 AMKevin Kho
08/23/2021, 5:34 AMKien Nguyen
08/31/2021, 6:35 AMKevin Kho
08/31/2021, 2:34 PM@task()
def abc():
res = AzureResult(...)
data = ...
res.write(data, location = "location.csv")
del data
gc.collect()
return location
and then you only pass the location so this is kept in memory, and then you read it from the downstream task.
You memory looks pretty low for data work though? I am seeing 1GB max? Not sure how much this “result inside the task” will help you.Kien Nguyen
09/04/2021, 3:14 PMKevin Kho
09/05/2021, 2:06 AMres.read(location)
I think