forked from lance-format/lance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_packed_struct.py
More file actions
160 lines (123 loc) · 5.17 KB
/
test_packed_struct.py
File metadata and controls
160 lines (123 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
import random
from pathlib import Path
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pytest
from lance.file import LanceFileReader, LanceFileWriter
NUM_ROWS = 10_000_000
RANDOM_ACCESS = "indices"
NUM_INDICES = 1000
NUM_ROUNDS = 10
BATCH_SIZE = 16 * 1024
# This file compares benchmarks for reading and writing a StructArray column using
# (i) parquet
# (ii) the lance v2 format with default struct encoding
# (iii) the lance v2 format with a packed struct encoding
# We can test both random access and full scan access performance by
# setting RANDOM_ACCESS to "indices" or "full", respectively
@pytest.fixture(scope="module")
def test_data(tmp_path_factory):
table = pa.table(
{
"struct_col": pa.StructArray.from_arrays(
[
pc.random(NUM_ROWS).cast(pa.float32()), # f1
pc.random(NUM_ROWS).cast(pa.float32()), # f2
pc.random(NUM_ROWS).cast(pa.float32()), # f3
pc.random(NUM_ROWS).cast(pa.float32()), # f4
],
["f1", "f2", "f3", "f4"],
)
}
)
return table
# generate NUM_INDICES random indices between 0 and NUM_ROWS for scanning
@pytest.fixture(scope="module")
def random_indices():
random_indices = [random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)]
random_indices.sort()
return random_indices
@pytest.mark.benchmark(group="read")
def test_parquet_read(tmp_path: Path, benchmark, test_data, random_indices):
parquet_path = tmp_path / "data.parquet"
pq.write_table(test_data, parquet_path)
def read_parquet():
parquet_file = pq.ParquetFile(parquet_path)
batches = parquet_file.iter_batches(batch_size=BATCH_SIZE)
tab_parquet = pa.Table.from_batches(batches)
return tab_parquet
if RANDOM_ACCESS == "indices":
benchmark.pedantic(
lambda: pq.read_table(parquet_path).take(random_indices), rounds=5
)
elif RANDOM_ACCESS == "full":
benchmark.pedantic(lambda: read_parquet(), rounds=5)
def read_lance_file_random(lance_path, random_indices):
for batch in (
LanceFileReader(lance_path).take_rows(indices=random_indices).to_batches()
):
pass
def read_lance_file_full(lance_path):
for batch in (
LanceFileReader(lance_path).read_all(batch_size=BATCH_SIZE).to_batches()
):
pass
@pytest.mark.benchmark(group="read")
def test_lance_read(tmp_path: Path, benchmark, test_data, random_indices):
lance_path = str(tmp_path) + "/lance_data"
with LanceFileWriter(lance_path, test_data.schema) as writer:
for batch in test_data.to_batches():
writer.write_batch(batch)
if RANDOM_ACCESS == "indices":
benchmark.pedantic(
read_lance_file_random, args=(lance_path, random_indices), rounds=NUM_ROUNDS
)
elif RANDOM_ACCESS == "full":
benchmark.pedantic(read_lance_file_full, args=(lance_path,), rounds=NUM_ROUNDS)
@pytest.mark.benchmark(group="read")
def test_lance_read_packed(tmp_path: Path, benchmark, test_data, random_indices):
lance_path = str(tmp_path) + "/lance_data"
field = test_data.schema.field("struct_col")
metadata = {b"packed": b"true"}
updated_field = pa.field(field.name, field.type, metadata=metadata)
updated_schema = pa.schema([updated_field])
new_table = pa.Table.from_arrays(test_data.columns, schema=updated_schema)
with LanceFileWriter(lance_path, new_table.schema) as writer:
for batch in new_table.to_batches():
writer.write_batch(batch)
if RANDOM_ACCESS == "indices":
benchmark.pedantic(
read_lance_file_random, args=(lance_path, random_indices), rounds=NUM_ROUNDS
)
elif RANDOM_ACCESS == "full":
benchmark.pedantic(read_lance_file_full, args=(lance_path,), rounds=NUM_ROUNDS)
@pytest.mark.benchmark(group="write")
def test_parquet_write(tmp_path: Path, benchmark, test_data):
parquet_path = tmp_path / "data.parquet"
benchmark.pedantic(
pq.write_table, args=(test_data, parquet_path), rounds=NUM_ROUNDS
)
def write_lance_file(lance_path, test_data):
with LanceFileWriter(lance_path, test_data.schema, version="2.1") as writer:
for batch in test_data.to_batches():
writer.write_batch(batch)
@pytest.mark.benchmark(group="write")
def test_lance_write(tmp_path: Path, benchmark, test_data):
lance_path = str(tmp_path) + "/lance_data"
benchmark.pedantic(
write_lance_file, args=(lance_path, test_data), rounds=NUM_ROUNDS
)
@pytest.mark.benchmark(group="write")
def test_lance_write_packed(tmp_path: Path, benchmark, test_data):
lance_path = str(tmp_path) + "/lance_data"
field = test_data.schema.field("struct_col")
metadata = {b"packed": b"true"}
updated_field = pa.field(field.name, field.type, metadata=metadata)
updated_schema = pa.schema([updated_field])
new_table = pa.Table.from_arrays(test_data.columns, schema=updated_schema)
benchmark.pedantic(
write_lance_file, args=(lance_path, new_table), rounds=NUM_ROUNDS
)