forked from lance-format/lance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_memory_leaks.py
More file actions
96 lines (76 loc) · 2.93 KB
/
test_memory_leaks.py
File metadata and controls
96 lines (76 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
from __future__ import annotations
import gc
import os
from typing import Callable
import lance
import psutil
import pyarrow as pa
MiB = 1024 * 1024
def get_memory_usage() -> int:
return psutil.Process(os.getpid()).memory_info().rss
def assert_noleaks(
operation: Callable[[], None],
*,
iterations: int = 100,
warmup_iterations: int = 5,
threshold_mb: float = 1.0,
check_interval: int = 10,
leeway_factor: float = 2.0, # optional jitter cushion
) -> None:
"""Check if an operation retains memory across repeated executions.
Args:
operation: A callable that performs the operation to test
iterations: Number of times to run the operation
warmup_iterations: Number of warmup runs before measuring
threshold_mb: Maximum allowed memory growth in MB
check_interval: How often to check memory during iterations
leeway_factor: Factor to multiply threshold for early bailout
Raises:
AssertionError: If memory leak is detected
"""
if iterations <= 0:
raise ValueError("iterations must be > 0")
if check_interval <= 0:
raise ValueError("check_interval must be > 0")
for _ in range(warmup_iterations):
operation()
gc.collect()
baseline = get_memory_usage()
for i in range(iterations):
operation()
if i > 0 and i % check_interval == 0:
gc.collect()
current = get_memory_usage()
growth_mb = (current - baseline) / MiB
if growth_mb > threshold_mb * leeway_factor:
raise AssertionError(
f"Possible leak: +{growth_mb:.2f} MiB after {i}/{iterations} "
f"(threshold {threshold_mb:.2f} MiB; leeway x{leeway_factor}). "
f"rss_base={baseline}, rss_now={current}"
)
gc.collect()
final = get_memory_usage()
total_mb = (final - baseline) / MiB
if total_mb > threshold_mb:
avg = total_mb / iterations
raise AssertionError(
f"Memory leak detected: +{total_mb:.2f} MiB over {iterations} iterations "
f"(threshold {threshold_mb:.2f} MiB; avg {avg:.4f} MiB/iter). "
f"rss_base={baseline}, rss_final={final}"
)
class TestMemoryLeaks:
def test_index_statistics_no_leak(self, tmp_path) -> None:
dataset_path = str(tmp_path / "dataset")
data = pa.table({"id": [1]})
ds = lance.write_dataset(data, dataset_path)
ds.create_scalar_index("id", index_type="BTREE")
def access_index_stats() -> None:
d = lance.dataset(dataset_path)
for idx in d.list_indices():
if name := idx.get("name"):
d.stats.index_stats(name)
assert_noleaks(
access_index_stats, iterations=1000, threshold_mb=2.0, check_interval=25
)