Post

Calculate the memory used by a process and its descendants

The issue / Need

I guess the issue is:

Is there a way of calculating -more or less- how much memory a process is using, including large chunks of binaries and child processes??

The solution

Some approaching to the problem would be:

  • Find out what the “process tree” asociated to the process we’re studying is.
  • Sum the memory erlang tells us each one of the process of the “process tree” is consuming.
  • Watch out for the binary memory consumption of every process inside that “process tree”.

“Process tree”, what “process tree”??

Keep in mind that in elixir is extremely easy to create/generate processes to help out. Say for example …

1
2
3
4
5
6
7
{:ok, counter_pid} = Agent.start(fn -> 0 end)

some_iterable = [1, 2, 3, 4, 5]

some_iterable |> Enum.map(fn element -> Agent.update(counter_pid, fn count -> count + element end))

IO.puts "final count: #{inspect(Agent.get(counter_pid, fn count -> count end))}"

I know, I know it’s a worthless piece of code, but it illustrates the point, which is: creating process is extremely cheap & easy in elixir. If, at any given moment, we wanted to know how much memory this code is using, we would have to know:

  • How much memory the process where the code is running take.
  • How much memory the process where the agent is running take (what is called counter_pid).

So the first to know when studying a process is to know which processes -if any- are “derived” from the process we’re studying. Say our code is being executed inside some process “A”, and part of our code is the use of Task.async/1; that would launch another process - let’s call it process “B”. So now, if we want to know how memory is our code consuming, we should consider the memory consumption of process “A” and also the memory consumption of process “B”.

So we can talk about some “process tree” derived from the main process. Let’s say our “main” process is called “A” and spawns three process, “A1”, “A2” and “A3”. Now suppose “A1” spawns two process “A11” and “A12” and “A3” spawms one “A31” process. Now, if we were to find out how much memory is consumed because process “A” we would have to know:

  • The memory used by process “A”
  • The memory used by processes “A1”, “A2” and “A3”
  • The memory used by processes “A11”, “A12” and “A31”

You can clearly see how a “process tree” is derived.

How erlang VM manages binaries

Now the question, how to measure how much memory a process is using?? For that we need to know that the memory used to store binaries may not be in what erlang considers to be the memory “used” by the process:

  • If the code within a process have a binary that uses less than 64Kb of memory, then that binary is “stored” in the process memory, so when erlang says that process uses some amount of memory, that includes the memory used to store the binary.
  • But if the binary uses more than 64Kb of memory, then the binary is NOT “stored” in the process memory, but in the general vm memory, therefore when erlang says that the process uses some amount of memory, that doesn’t include the memory used to store the binary. That information has to be looked up elsewhere. Why erlang does this? I guess it’s because that way other processes can access those “external” binaries by their reference, let’s say process “A” and process “B” both uses exactly the same binary and that binary takes 2Mb. The both could use the same binary, accessing it by reference, and the memory used to store that is used only once, not twice.

The tool

The basic tool we use to obtain info from a process is:

1
Process.info(pid, <some_atom>)

where <some_atom> refers to the “kind of info” we want to retrieve from the process identified by pid. Let’s see what kind of info we can get for a process, depending on the value of <some_atom>.

Basic information about a process

  • :current_function

    1
    2
    
    > Process.info(pid, :current_function)
    {:current_function, {SomeModule, :some_function, 2}}
    

    Tell us about what function is running in this process right now, it provides, the module name, the function name within that module, and the arity of the function.

  • :status

    1
    2
    
    > Process.info(pid, :status)
    {:status, :running}
    

    Tell us about in which status is the process right now, so far, I’ve seen two status and I can reasonabily guess what they mean:

    • :running: The process is actually running some code
    • :waiting: The process is doing nothing, waiting from some signal (see the receive macro about that)
  • :memory

    1
    2
    
    > Process.info(pid, :memory)
    {:memory, 426552}
    

    Tell us about what memory erlang says this process is using, remember this does not include the memory used by any binary > 64Kb that the process may be using

What are “linked process”?

1
2
> Process.info(pid, :links)
{:links, [#PID<0.805.0>, #PID<0.806.0>, #PID<0.807.0>, #PID<0.782.0>]}

The :links atom tells us about processes or ports linked to this process. That is, processes that will die when this process die (somehow).

In elixir, when two process are “linked” mean that if one of the dies of non-natural causes, the other one dies, too. What are “natural causes”? when a process dies, all its linked process know about its dead, and also knows about an atom called :reason which the “reason of death”. If it’s {:shutdown, whatever} or :normal of :shutdown, then it’s considered a “natural death” and therefore the linked process doesn’t have to die.

The only “natural causes” reason that pure elixir/erlang accepts is :normal, but if you’re creating you process following the OTP Design principles, then you’re using (even If you don’t know), proc_lib for that, and then :shutdown and {:shutdown, whatever} are also accepted as “natural causes”.

What is the “process dictionary”?

1
2
3
4
5
6
7
8
9
> Process.info(pid, :dictionary)
{:dictionary,
  [
    "$initial_call": {IEx.Evaluator, :init, 5},
    ...,
    "$ancestors": [#PID<0.2599.0>],
    "$callers": [#PID<0.2599.0>],
    ...
  ]}

The :dictionary atom tells us about the process dictionary That is, a map that every process have, and that we can add key/value pairs to … It can be completely empty. (It’s not guaranteed that it has any value at all)

When you use proc_lib to create process (again, maybe you don’t know you’re using it), then …

Some useful information is initialized when a process starts. The registered names, or the process identifiers, of the parent process, and the parent ancestors, are stored together with information about the function initially called in the process.

That means you can expect to find a process dictionary, and you expect it to has the :"$ancestors" or :"$callers" key …

Am I using :proc_lib???

All this cooked-in implementations use it “under the hood”:

  • GenServer functions
  • Agent functions
  • Task.async
  • Task.async_stream for the processes used to run every element of the enumerable provided.

So yeah, unless you’re using spawm directly (and things like that), I’d say it’s very probable that you’re using :proc_lib everywhere.

There’s a good discussion about it in the elixir forums, though.

The “binaries” of a process

1
2
> Process.info(pid, :binary)
{:binary, [{139843198370240, 256, 1}, {94479811247752, 256, 1}]}

As said before, if a binary a process uses is larger than 64Kb, the it’s not stored in the process’s memory, but outside. According to that, Process.info(pid, :binary) returns a list of tuples with the following structure: {reference, memory_used, how_many_times_used}, so in order to know how much “binary” memory is used, we’d had to sum memory info from this list, but, remember, only once! Say process “A” has in ther binary list {:binary, [{111111, 256, 2}, ...]} and process “B”: {:binary, [..., {111111, 256, 1}, ...}, and the memory calculations we have to do suppose process “B” belongs to the “process tree” derived from “A”. Then the memory of {111111, 256, _} is only to be counted once, even if two process are using it

The Task.async_stream quirk

When Task.async_stream is launched, a single, monitor process without parents or callers is launched just to collect results of the processes responsible for each element of the enumerable provided Let’s see with an example, assume we invoke

1
> Task.async_stream([1, 2, 3, 4], &some_function/1)

and the process we invoke from is the 111 (#PID<0.111.0>). Well then:

  • A “monitor process” with nothing is its dictionary :"$ancestors" or :"$callers" entries is called. This process is _linked to #PID<0.111.0>, though.
  • 4 processes (one for every element of the enumerable) are called. Each of these processes run some_function(element). These processes:
    • Are linked to the “monitor process”
    • In the :"$ancestors" entry, there’s only one element, which is the monitor process
    • The :"callers" entry, is [#PID<0.111.0>], the origina process that called Task.async_stream

Show me the code!!

With all this hard-won knowledge, I put together some code than I hope helps you …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
defmodule MemoryMonitor do
  @moduledoc """
  This module implements a GenServer that monitors the memory usage of a process.
  It periodically checks the memory consumption.
  """

  use GenServer

  @time_to_check :timer.seconds(10)

  @doc """
  Starts the MemoryMonitor process linked to the caller process with given options.
  """
  @spec start_link(pid()) :: GenServer.on_start()
  def start_link(pid_to_monitor) do

    GenServer.start_link(__MODULE__, pid_to_monitor)
  end

  @impl GenServer
  @spec init(pid) :: {:ok, pid}
  def init(pid) do
    Process.send_after(self(), :check, @time_to_check)
    Process.monitor(pid)

    {:ok, pid}
  end

  @doc """
  Handles incoming messages to the GenServer.
    - `:check` - Performs memory usage checks on the monitored process
  """
  @impl GenServer
  def handle_info(:check, pid_to_monitor) do
    Process.send_after(self(), :check, @time_to_check)

    memory_used = get_memory_usage(pid_to_monitor)
    {:noreply, pid_to_monitor}
  end

  # when the monitored process dies, die
  def handle_info({:DOWN, _ref, :process, pid, _}, pid), do: {:stop, :normal, nil}

  @spec get_memory_usage(pid) :: non_neg_integer
  defp get_memory_usage(pid_to_monitor) do
    processes_tree = get_processes_tree(pid_to_monitor, MapSet.new([]))
    {bin_mem_size, _} = get_bin_memory(processes_tree)
    process_mem_size = get_heap_memory(processes_tree)
    bin_mem_size + process_mem_size
  end

  @spec get_processes_tree(pid | port, MapSet.t()) :: map
  defp get_processes_tree(pid, used_pids) when is_pid(pid) do
    if MapSet.member?(used_pids, pid),
      do: nil,
      else: do_get_processes_tree(pid, used_pids)
  end

  # the linked resource may be a port, not a pid
  defp get_processes_tree(_, _), do: nil

  @spec do_get_processes_tree(pid | port, MapSet.t()) :: map
  defp do_get_processes_tree(pid, used_pids) when is_pid(pid) do
    used_pids = MapSet.put(used_pids, pid)

    {process_extra_info, process_info} =
      pid
      |> Process.info([:dictionary, :current_function, :status, :links, :memory, :binary])
      |> Keyword.split([:dictionary, :current_function, :status])

    if child?(process_extra_info, used_pids) || MapSet.size(used_pids) == 1 do
      process_info
      |> Map.new()
      |> Map.update(:links, [], &get_allowed_processes(&1, used_pids))
    else
      nil
    end
  end

  @spec get_allowed_processes(pid, MapSet.t()) :: [map]
  defp get_allowed_processes(pids, used_pids) do
    pids
    |> Enum.map(&get_processes_tree(&1, used_pids))
    |> Enum.reject(&is_nil/1)
  end

  @spec get_bin_memory(map, {integer, MapSet.t()}) :: {integer, MapSet.t()}
  defp get_bin_memory(
         %{binary: binaries, links: links},
         {mem_used, used_bin_refs} \\ {0, MapSet.new([])}
       ) do
    {bin_mem, used_bin_refs} = Enum.reduce(binaries, {mem_used, used_bin_refs}, &maybe_sum/2)
    Enum.reduce(links, {bin_mem, used_bin_refs}, &get_bin_memory/2)
  end

  @spec maybe_sum({integer, integer, integer}, {integer, MapSet.t()}) :: {integer, MapSet.t()}
  defp maybe_sum({bin_ref, mem, _}, {total_mem, used_bin_refs}) do
    if MapSet.member?(used_bin_refs, bin_ref),
      do: {total_mem, used_bin_refs},
      else: {total_mem + mem, MapSet.put(used_bin_refs, bin_ref)}
  end

  @spec get_heap_memory(map) :: integer
  defp get_heap_memory(%{memory: mem, links: links}) do
    links
    |> Enum.map(&get_heap_memory/1)
    |> Enum.sum()
    |> Kernel.+(mem)
  end

  @spec child?(keyword, MapSet.t()) :: boolean
  defp child?(process_extra_info, used_pids) do
    dictionary = Keyword.get(process_extra_info, :dictionary)
    status = Keyword.get(process_extra_info, :status)
    {module, _, _} = Keyword.get(process_extra_info, :current_function)
    parents = get_parents(dictionary)
    # if it's Task.asyn_stream monitoring process, then its status is :waiting
    !MapSet.disjoint?(parents, used_pids) || (module == Task.Supervised && status == :waiting)
  end

  @spec get_parents(keyword) :: MapSet.t()
  defp get_parents(dictionary) do
    ancestors = Keyword.get(dictionary, :"$ancestors", [])
    callers = Keyword.get(dictionary, :"$callers", [])
    MapSet.new(ancestors ++ callers)
  end
end