https://prefect.io logo
#prefect-community
Title
# prefect-community
a

as

09/23/2020, 12:51 PM
Hi, I was wondering if prefect always keep its task results values in memory during whole flow run or if gets removed from memory when enabling writing to a LocalResults target. As an example, what happens in the following task?
Copy code
jsonresult = task(
        get_json_fun,
        result=LocalResult(serializer=JSONSerializer()),
        target='/path/result.json',
        checkpoint=True,
    )(p)
Is jsonresult still in memory next to being written to result.json or not? I'm worried that my machine will get out of memory when my flow get's bigger with many task results floating around.
j

josh

09/23/2020, 1:24 PM
Hi @as how are you running your flow? If the process your flow is running in is still alive then I believe it should still exist in memory (e.g. running in ipython or a notebook or something) as well as written to a local file. If you are running this flow through the use of a Prefect agent then the result should only be persisted to a local file location and should not be stored in memory
a

as

09/23/2020, 1:28 PM
Hi thanks for answering I run the flow locally. without the server part. There is no way to only let it persist as a local file during the flow run this way? What with flows that produce many sizeable result objects? The memory of you machine can run out pretty fast I would think.
j

josh

09/23/2020, 1:30 PM
This is more of a python question than prefect because it depends on what you are doing. If you have a notebook that does something like this:
Copy code
final1 = flow.run()
final2 = flow.run()
final3 = flow.run()
Then as long as that process is alive all three of those states w/ task results will exist in memory. If you run your flow as a script like:
Copy code
python my_flow_file.py
Then the results that were in memory will no longer exist in memory because the process ended. In both cases the results will still be written to a local file using the LocalResult.
a

as

09/23/2020, 1:45 PM
I understand this. My problem is what happens if you run out of memory during flow.run(). Say for example you are processing many( eg millions) of txt files. -task1 produces a list of paths to these files -task2 maps over the output of task1 and calculate some intermediate result for each results (eg. feature extraction) -task3 does again something else with the output of task3 -etc... I can imagine the your machine will run out of memory during step 2. I understand you can write the intermediate json results to disk yourself als part of task2 and read it back in during task3. But I was hoping I could keep my functions in the Tasks pure and abstract disk I/O away as part of the prefect flow. I hope I could make myself clear enough 🙂
j

josh

09/23/2020, 1:52 PM
Ah I see what you’re saying, thanks for clarifying! Yeah running out of memory during a run is definitely possible (especially when running on a single machine) and has the potential to run into weird issues due to resource constraints. If you’re performing any memory intensive tasks like this and you’re worried about hitting limits I would recommend looking into using prefect’s native dask integration which will let you farm out your tasks execution to workers on a dask cluster. This allows you to spin up / work with dask clusters that can be distributed across many machines with various amounts of memory
a

as

09/23/2020, 2:05 PM
Hey, Yes I understand the benifits of parallellisation with dask. But I'm not sure it covers completely what I was trying to get. My example was probably sufficient. Consider the following example -task1 processes the input in generates a fairly large output project (eg. a pandas dataframe, or big numpy matrix) that is 100's of mb. -task2 does some manipulation of the task 1 result and produces another sizeable object -task4 does some manipulation of the task 2 result and produces another sizeable object -etc each results fits easily in memory but if you have many such tasks in your flow you will run out of memory fast. In this case it would be favorable to serialize the intermediate task results at each step to disk and out of memory while the flow is running. I don't think Dask can solve this issue.
j

josh

09/23/2020, 2:15 PM
If we had a way to opt into something where a user could say “Store this task result but don’t keep in memory then have downstream task load from result location” I think this would resolve exactly what you’re after wouldn’t it? 🤔
a

as

09/23/2020, 2:21 PM
Yes, I think it would. 🙂
I guess, it would make sense as an option in the result class or result handlers
j

josh

09/23/2020, 2:24 PM
Would you mind opening a feature request on the repo for this? I don’t think there is a pure prefect way of doing this (could be missing something) but I think it could be implemented! As a workaround you could also not use prefect’s result interface and go with something more manual in your tasks for the time being 🙂
Copy code
@task
def a():
  d = get_data(...)
  location = write_to_file(d)
  return location

@task
def b(location):
  d = load_from_file(location)
  ...

with Flow(...) as f:
  location = a()
  b(location)
^ this only passes around the location of the data
a

as

09/23/2020, 2:33 PM
Yes you are right, I could do this. But I was hoping to keep my functions pure of I/O and let prefect handle this automatically. If multiple tasks dependent on another or have multiple inputs, it can get unwieldy fast and your code gets cluttered with many read and write functions. (Or I am just being lazy 😛 ) I look into opening a feature request
👍 1
(btw is there a way to reference this slack thread in a github issue?)
j

josh

09/23/2020, 2:36 PM
Ah yeah let me open an issue directly from this actually
@Marvin open “Add result option to not persist data in memory and fall back on result interface read/write”
a

as

09/23/2020, 3:42 PM
Great, thanks!!