libs/memory-budgeter.lua

184 lines

1-- {{{ memory-budgeter.lua
2-- Issue 10-057: estimate-and-fit memory budgeter for threaded / GPU stages.
3--
4-- WHAT IT DOES (for a CEO): before a stage spins up N parallel workers, it asks
5-- "will this actually fit in the machine's free memory, or will it spill into swap
6-- and crawl?" This library answers that with simple arithmetic, and tells the stage
7-- how many workers it can safely run. If even the shared data alone won't fit, it
8-- does NOT stop the build -- it warns loudly that the run will use swap (a sign the
9-- data should be shrunk, or the slowdown accepted) and proceeds at one worker.
10--
11-- HOW: every stage's memory has the same shape -- a FIXED part loaded once (caches,
12-- a model) plus a PER-WORKER part that scales with the worker count. Given those two
13-- numbers and how much memory is free, the safe worker count is just
14-- floor((free * headroom - fixed) / per_worker).
15-- The library knows nothing about WHAT the bytes are; each stage hands it the two
16-- numbers and which pool (system RAM or GPU VRAM) to measure. RAM and VRAM differ
17-- only in how "free" is measured -- the math is identical -- which is what makes one
18-- budgeter reusable across every stage in the pipeline.
19--
20-- Two layers, kept separate so the arithmetic is testable without real memory:
21-- * compute_fit() -- pure: numbers in, decision out. No I/O.
22-- * fit_threads() -- wraps compute_fit() with a live memory probe and logging.
23--
24-- Usage:
25-- local budget = require("memory-budgeter")
26-- local threads = budget.fit_threads({
27-- pool = "ram", fixed = cache_bytes, per_thread = page_buf_bytes,
28-- want = requested_threads, label = "HTML",
29-- })
30-- }}}
31
32local utils = require("utils")
33
34local M = {}
35
36-- {{{ format_bytes()
37-- Human-readable size for log lines (decimal GB/MB/KB -- close enough for a log).
38local function format_bytes(n)
39 if n >= 1e9 then return string.format("%.1f GB", n / 1e9) end
40 if n >= 1e6 then return string.format("%.0f MB", n / 1e6) end
41 if n >= 1e3 then return string.format("%.0f KB", n / 1e3) end
42 return string.format("%d B", n)
43end
44-- }}}
45
46-- {{{ probe_ram_bytes()
47-- Free system RAM, from /proc/meminfo's MemAvailable -- the kernel's own estimate of
48-- how much can be allocated without swapping (it counts reclaimable page cache, so
49-- it is more honest than MemFree). Pure file read, no exec. Errors rather than guess
50-- if the field is somehow absent (a missing number is a real problem, not a default).
51local function probe_ram_bytes()
52 local f = io.open("/proc/meminfo", "r")
53 if not f then
54 error("memory-budgeter: cannot open /proc/meminfo to size free RAM")
55 end
56 local available_kb
57 for line in f:lines() do
58 local kb = line:match("^MemAvailable:%s+(%d+) kB")
59 if kb then available_kb = tonumber(kb); break end
60 end
61 f:close()
62 if not available_kb then
63 error("memory-budgeter: /proc/meminfo has no MemAvailable field")
64 end
65 return available_kb * 1024
66end
67-- }}}
68
69-- {{{ probe_vram_bytes()
70-- Free GPU VRAM. Best-effort via nvidia-smi (read-only query). Non-NVIDIA GPUs have
71-- no portable CLI, so this errors with a clear instruction rather than guessing -- a
72-- caller on a different GPU passes its own `probe` function in the spec. The math
73-- downstream is identical to RAM; only this measurement differs.
74local function probe_vram_bytes()
75 local pipe = io.popen("nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits")
76 if pipe then
77 local out = pipe:read("*l")
78 pipe:close()
79 local mib = out and out:match("(%d+)")
80 if mib then return tonumber(mib) * 1024 * 1024 end
81 end
82 error("memory-budgeter: could not query free VRAM (nvidia-smi unavailable on " ..
83 "this host/GPU). Pass a custom `probe` function in the spec for this GPU.")
84end
85-- }}}
86
87-- {{{ M.file_size_bytes()
88-- A convenience for the common "fixed = cache file size x parse factor" descriptor:
89-- the on-disk size of a file, or nil if it does not exist (the caller decides whether
90-- a missing input is fatal). Uses seek, not a shell stat -- no exec.
91function M.file_size_bytes(path)
92 local f = io.open(path, "rb")
93 if not f then return nil end
94 local size = f:seek("end")
95 f:close()
96 return size
97end
98-- }}}
99
100-- {{{ M.default_probe()
101-- Map a pool name to its built-in measurement. A stage may bypass this entirely by
102-- supplying its own `probe` in the spec (e.g. a Vulkan VRAM query).
103function M.default_probe(pool)
104 if pool == "ram" then return probe_ram_bytes end
105 if pool == "vram" then return probe_vram_bytes end
106 error("memory-budgeter: unknown pool '" .. tostring(pool) ..
107 "' (expected 'ram' or 'vram', or pass a custom probe)")
108end
109-- }}}
110
111-- {{{ M.compute_fit()
112-- The pure decision -- numbers in, plan out, no I/O so it is unit-testable. How many
113-- of `want` workers fit in `available` bytes, leaving a `headroom` fraction free for
114-- the OS, GC churn, and JSON parse spikes? Returns:
115-- threads = the safe worker count (always >= 1)
116-- reduced = true if we cut below `want` to fit
117-- swapping = true if even one worker overflows the budget (caller should WARN; the
118-- fix is to shrink `fixed`, not to change the thread count)
119-- budget = available * headroom (the usable ceiling, for logging)
120function M.compute_fit(available, fixed, per_thread, want, headroom)
121 headroom = headroom or 0.7
122 want = math.max(1, want or 1)
123 local budget = available * headroom
124
125 -- A worker with no marginal cost cannot be limited by per-worker memory; only the
126 -- shared `fixed` can overflow. Threads are then "free" memory-wise, so we keep the
127 -- requested count and only flag swapping if `fixed` itself blows the budget.
128 if not per_thread or per_thread <= 0 then
129 return { threads = want, reduced = false, swapping = fixed > budget, budget = budget }
130 end
131
132 local safe = math.floor((budget - fixed) / per_thread)
133 if safe < 1 then
134 -- Even one worker plus the fixed cost overflows the headroom budget. No thread
135 -- count rescues this; run at one worker and let the caller warn about swap.
136 return { threads = 1, reduced = want > 1, swapping = true, budget = budget }
137 end
138
139 local threads = math.min(want, safe)
140 return { threads = threads, reduced = threads < want, swapping = false, budget = budget }
141end
142-- }}}
143
144-- {{{ M.fit_threads()
145-- The live entry point: probe the pool, run compute_fit, log the estimate and the
146-- decision, and return the safe worker count. spec fields:
147-- pool "ram" | "vram" (which memory to measure; ignored if `probe` given)
148-- fixed bytes loaded once, shared across workers
149-- per_thread bytes each worker adds (0/nil if workers are free)
150-- want requested worker count
151-- headroom optional fraction of free memory to use (default 0.7)
152-- probe optional function() -> free bytes (overrides the pool default)
153-- label optional short tag for the log lines (e.g. "HTML")
154function M.fit_threads(spec)
155 local probe = spec.probe or M.default_probe(spec.pool)
156 local available = probe()
157 local fit = M.compute_fit(available, spec.fixed, spec.per_thread, spec.want, spec.headroom)
158
159 local tag = "[budget" .. (spec.label and (":" .. spec.label) or "") .. "]"
160 local pool = (spec.pool or "ram"):upper()
161 utils.log_info(string.format(
162 "%s %s free %s, budget %s; fixed %s + %d x %s/worker -> %d worker(s)",
163 tag, pool, format_bytes(available), format_bytes(fit.budget),
164 format_bytes(spec.fixed or 0), fit.threads, format_bytes(spec.per_thread or 0),
165 fit.threads))
166
167 if fit.swapping then
168 utils.log_warn(string.format(
169 "%s the shared data (%s) does not fit the %s budget (%s) -- running at 1 " ..
170 "worker and relying on SWAP. Shrink the shared data (shard/cap/stream it) " ..
171 "or accept the slowdown. See issue 10-057.",
172 tag, format_bytes(spec.fixed or 0), pool, format_bytes(fit.budget)))
173 elseif fit.reduced then
174 utils.log_info(string.format(
175 "%s reduced workers %d -> %d to stay out of swap",
176 tag, math.max(1, spec.want or 1), fit.threads))
177 end
178
179 return fit.threads
180end
181-- }}}
182
183return M
184