forked from lance-format/lance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_tracing.py
More file actions
128 lines (105 loc) · 3.33 KB
/
test_tracing.py
File metadata and controls
128 lines (105 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
import glob
import os
import subprocess
import sys
import uuid
import pytest
@pytest.mark.skipif(
sys.platform == "win32",
reason="test fails in CI on Windows but passes locally on Windows",
)
def test_tracing():
trace_files_before = set(glob.glob("trace-*.json"))
subprocess.run(
[
sys.executable,
"-c",
"from lance.tracing import trace_to_chrome; trace_to_chrome()",
],
check=True,
env={
"LANCE_LOG": "debug",
},
)
trace_files_after = set(glob.glob("trace-*.json"))
assert len(trace_files_before) + 1 == len(trace_files_after)
new_trace_file = next(iter(trace_files_after - trace_files_before))
os.remove(new_trace_file)
some_uuid = uuid.uuid4()
trace_name = f"{some_uuid}.json"
subprocess.run(
[
sys.executable,
"-c",
"from lance.tracing import trace_to_chrome;"
+ f"trace_to_chrome(file='{trace_name}')",
],
check=True,
env={
"LANCE_LOG": "debug",
},
)
assert os.path.exists(trace_name)
os.remove(trace_name)
@pytest.mark.skipif(
sys.platform == "win32",
reason="test fails in CI on Windows but passes locally on Windows",
)
def test_tracing_callback(tmp_path):
script = tmp_path / "script.py"
script.write_text(
"""import lance
import pyarrow as pa
import time
from lance.tracing import capture_trace_events
def wait_until(condition, timeout=1, interval=0.01):
start = time.time()
while time.time() - start < timeout:
if condition():
return
time.sleep(interval)
raise TimeoutError(f"Condition not met after {timeout} seconds")
events = []
def callback(evt):
events.append(evt)
capture_trace_events(callback)
lance.write_dataset(pa.table({"x": range(100)}), "memory://test")
wait_until(lambda: len(events) == 6)
assert events[0].target == "lance::dataset_events"
assert events[0].args["event"] == "loading"
assert events[0].args["uri"] == "memory://test"
assert events[1].target == "lance::dataset_events"
assert events[1].args["event"] == "writing"
assert events[1].args["uri"] == "memory://test"
assert events[1].args["mode"] == "Create"
assert events[2].target == "lance::file_audit"
assert events[2].args["mode"] == "create"
assert events[2].args["type"] == "data"
assert events[3].target == "lance::dataset_events"
assert events[3].args["event"] == "loading"
assert events[3].args["uri"] == "memory://test"
assert events[4].target == "lance::file_audit"
assert events[4].args["mode"] == "create"
assert events[4].args["type"] == "manifest"
assert events[5].target == "lance::dataset_events"
assert events[5].args["event"] == "committed"
assert events[5].args["uri"] == "memory://test"
assert events[5].args["read_version"] == "0"
assert events[5].args["committed_version"] == "1"
assert events[5].args["detached"] == "false"
assert events[5].args["operation"] == "Overwrite"
for e in events:
assert e.args["timestamp"] is not None
assert e.args["client_version"] == lance.__version__
"""
)
subprocess.run(
[sys.executable, script],
capture_output=True,
check=True,
env={
"LANCE_LOG": "debug",
},
)