forked from lance-format/lance
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfile.py
More file actions
510 lines (442 loc) · 16.5 KB
/
file.py
File metadata and controls
510 lines (442 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
from pathlib import Path
from typing import Dict, List, Optional, Union
import pyarrow as pa
from .io import StorageOptionsProvider
from .lance import (
LanceBufferDescriptor,
LanceColumnMetadata,
LanceFileMetadata,
LanceFileStatistics,
LancePageMetadata,
stable_version,
)
from .lance import (
LanceFileReader as _LanceFileReader,
)
from .lance import (
LanceFileSession as _LanceFileSession,
)
from .lance import (
LanceFileWriter as _LanceFileWriter,
)
class ReaderResults:
"""
Utility class for converting results from Lance's internal
format (RecordBatchReader) to a desired format such
as a pyarrow Table, etc.
"""
def __init__(self, reader: pa.RecordBatchReader):
"""
Creates a new instance, not meant for external use
"""
self.reader = reader
def to_batches(self) -> pa.RecordBatchReader:
"""
Return the results as a pyarrow RecordBatchReader
"""
return self.reader
def to_table(self) -> pa.Table:
"""
Return the results as a pyarrow Table
"""
return self.reader.read_all()
class LanceFileReader:
"""
A file reader for reading Lance files
This class is used to read Lance data files, a low level structure
optimized for storing multi-modal tabular data. If you are working with
Lance datasets then you should use the LanceDataset class instead.
"""
def __init__(
self,
path: str,
storage_options: Optional[Dict[str, str]] = None,
columns: Optional[List[str]] = None,
*,
storage_options_provider: Optional[StorageOptionsProvider] = None,
_inner_reader: Optional[_LanceFileReader] = None,
):
"""
Creates a new file reader to read the given file
Parameters
----------
path: str
The path to read, can be a pathname for local storage
or a URI to read from cloud storage.
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional
A provider that can provide storage options dynamically. This is useful
for credentials that need to be refreshed or vended on-demand.
columns: list of str, default None
List of column names to be fetched.
All columns are fetched if None or unspecified.
"""
if _inner_reader is not None:
self._reader = _inner_reader
else:
if isinstance(path, Path):
path = str(path)
self._reader = _LanceFileReader(
path,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
columns=columns,
)
def read_all(self, *, batch_size: int = 1024, batch_readahead=16) -> ReaderResults:
"""
Reads the entire file
Parameters
----------
batch_size: int, default 1024
The file will be read in batches. This parameter controls
how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly
slower because there is more per-batch overhead
"""
return ReaderResults(self._reader.read_all(batch_size, batch_readahead))
def read_range(
self, start: int, num_rows: int, *, batch_size: int = 1024, batch_readahead=16
) -> ReaderResults:
"""
Read a range of rows from the file
Parameters
----------
start: int
The offset of the first row to start reading
num_rows: int
The number of rows to read from the file
batch_size: int, default 1024
The file will be read in batches. This parameter controls
how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly
slower because there is more per-batch overhead
"""
return ReaderResults(
self._reader.read_range(start, num_rows, batch_size, batch_readahead)
)
def take_rows(
self, indices, *, batch_size: int = 1024, batch_readahead=16
) -> ReaderResults:
"""
Read a specific set of rows from the file
Parameters
----------
indices: List[int]
The indices of the rows to read from the file in ascending order
batch_size: int, default 1024
The file will be read in batches. This parameter controls
how many rows will be in each batch (except the final batch)
Smaller batches will use less memory but might be slightly
slower because there is more per-batch overhead
"""
for i in range(len(indices) - 1):
if indices[i] > indices[i + 1]:
raise ValueError(
f"Indices must be sorted in ascending order for \
file API, got {indices[i]} > {indices[i + 1]}"
)
return ReaderResults(
self._reader.take_rows(indices, batch_size, batch_readahead)
)
def metadata(self) -> LanceFileMetadata:
"""
Return metadata describing the file contents
"""
return self._reader.metadata()
def file_statistics(self) -> LanceFileStatistics:
"""
Return file statistics of the file
"""
return self._reader.file_statistics()
def read_global_buffer(self, index: int) -> bytes:
"""
Read a global buffer from the file at a given index
Parameters
----------
index: int
The index of the global buffer to read
Returns
-------
bytes
The contents of the global buffer
"""
return self._reader.read_global_buffer(index)
def num_rows(self) -> int:
"""Return the number of rows belonging to the data file."""
return self._reader.num_rows()
class LanceFileSession:
"""
A file session for reading and writing Lance files.
If you plan on opening many readers or writers then creating a session first can
be more efficient as it will share the underlying object_store configuration with
all of the readers and writers.
"""
def __init__(
self,
base_path: str,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
):
"""
Creates a new file session
Parameters
----------
base_path: str
The base path to read from. Can be a pathname for local storage
or a URI to read from cloud storage. All readers will be opened relative
to this base path.
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional
A provider that can provide storage options dynamically. This is useful
for credentials that need to be refreshed or vended on-demand.
"""
if isinstance(base_path, Path):
base_path = str(base_path)
self._session = _LanceFileSession(
base_path,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
)
def open_reader(
self, path: str, columns: Optional[List[str]] = None
) -> LanceFileReader:
"""
Opens a new reader for the given path
The path will be appended to the base path of the session.
"""
return LanceFileReader(
None, # pyright: ignore[reportArgumentType]
_inner_reader=self._session.open_reader(path, columns),
)
def open_writer(
self,
path: str,
*,
schema: Optional[pa.Schema] = None,
data_cache_bytes: Optional[int] = None,
version: Optional[str] = None,
keep_original_array: Optional[bool] = None,
max_page_bytes: Optional[int] = None,
) -> "LanceFileWriter":
"""
Opens a new writer for the given path (relative to this session's base path),
reusing the session's underlying object store.
Parameters
----------
path : str
Path relative to `base_path` where the file will be written.
schema : pyarrow.Schema, optional
If provided, creates a schema-bound writer; otherwise a lazy writer is
created.
data_cache_bytes : int, optional
Size of the row-group/page write cache in bytes.
version : str, optional
Lance file format version (e.g. "2"). Parsed by the Rust layer.
keep_original_array : bool, optional
If True, retain original arrays in the writer (advanced/diagnostic).
max_page_bytes : int, optional
Target max page size in bytes.
Returns
-------
LanceFileWriter
"""
inner = self._session.open_writer(
path,
schema, # pyarrow.Schema or None
data_cache_bytes,
version,
keep_original_array,
max_page_bytes,
)
return LanceFileWriter(
None, # pyright: ignore[reportArgumentType]
_inner_writer=inner,
)
def contains(self, path: str) -> bool:
"""
Check if a file exists at the given path (relative to this session's base path).
Parameters
----------
path : str
Path relative to `base_path` to check for existence.
Returns
-------
bool
True if the file exists, False otherwise.
"""
return self._session.contains(path)
def list(self, path: Optional[str] = None) -> List[str]:
"""
List all files at the given path (relative to this session's base path).
Parameters
----------
path : str, optional
Path relative to `base_path` to list files from. If None, lists files
from the base path.
Returns
-------
List[str]
List of file paths.
"""
return self._session.list(path)
def upload_file(self, local_path: Union[str, Path], remote_path: str) -> None:
"""
Upload a file from local filesystem to the object store.
Parameters
----------
local_path : str or Path
Local file path to upload.
remote_path : str
Remote path relative to session's base_path.
"""
if isinstance(local_path, Path):
local_path = str(local_path)
self._session.upload_file(local_path, remote_path)
def download_file(self, remote_path: str, local_path: Union[str, Path]) -> None:
"""
Download a file from object store to local filesystem.
Parameters
----------
remote_path : str
Remote path relative to session's base_path.
local_path : str or Path
Local file path where the file will be saved.
"""
if isinstance(local_path, Path):
local_path = str(local_path)
self._session.download_file(remote_path, local_path)
class LanceFileWriter:
"""
A file writer for writing Lance data files
This class is used to write Lance data files, a low level structure
optimized for storing multi-modal tabular data. If you are working with
Lance datasets then you should use the LanceDataset class instead.
"""
def __init__(
self,
path: str,
schema: Optional[pa.Schema] = None,
*,
data_cache_bytes: Optional[int] = None,
version: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
max_page_bytes: Optional[int] = None,
_inner_writer: Optional[_LanceFileWriter] = None,
**kwargs,
):
"""
Create a new LanceFileWriter to write to the given path
Parameters
----------
path: str
The path to write to. Can be a pathname for local storage
or a URI for remote storage.
schema: pa.Schema
The schema of data that will be written. If not specified then
the schema will be inferred from the first batch. If the schema
is not specified and no data is written then the write will fail.
data_cache_bytes: int
How many bytes (per column) to cache before writing a page. The
default is an appropriate value based on the filesystem.
version: str
The version of the file format to write. If not specified then
the latest stable version will be used. Newer versions are more
efficient but may not be readable by older versions of the software.
storage_options : optional, dict
Extra options to be used for a particular storage connection. This is
used to store connection parameters like credentials, endpoint, etc.
storage_options_provider : optional, StorageOptionsProvider
A storage options provider that can fetch and refresh storage options
dynamically. This is useful for credentials that expire and need to be
refreshed automatically.
max_page_bytes : optional, int
The maximum size of a page in bytes, if a single array would create a
page larger than this then it will be split into multiple pages. The
default value is 32MB.
"""
if _inner_writer is not None:
self._writer = _inner_writer
else:
if isinstance(path, Path):
path = str(path)
self._writer = _LanceFileWriter(
path,
schema,
data_cache_bytes=data_cache_bytes,
version=version,
storage_options=storage_options,
storage_options_provider=storage_options_provider,
max_page_bytes=max_page_bytes,
**kwargs,
)
self.closed = False
def write_batch(self, batch: Union[pa.RecordBatch, pa.Table]) -> None:
"""
Write a batch of data to the file
parameters
----------
batch: Union[pa.RecordBatch, pa.Table]
The data to write to the file
"""
if isinstance(batch, pa.Table):
for batch in batch.to_batches():
self._writer.write_batch(batch)
else:
self._writer.write_batch(batch)
def close(self) -> Optional[int]:
"""
Write the file metadata and close the file
Returns the number of rows written to the file
"""
if self.closed:
return
self.closed = True
return self._writer.finish()
def add_schema_metadata(self, key: str, value: str) -> None:
"""
Add a metadata (key/value pair) entry to the schema. This method allows you to
alter the schema metadata. It must be called before `close` is called.
Parameters
----------
key: str
The key to add.
value: str
The value to add.
"""
self._writer.add_schema_metadata(key, value)
def add_global_buffer(self, data: bytes) -> int:
"""
Add a global buffer to the file. The global buffer can contain any
arbitrary bytes.
Parameters
----------
data: bytes
The data to write to the file.
Returns
-------
int
The index of the global buffer. This will always start at 1
and increment by 1 each time this method is called.
"""
return self._writer.add_global_buffer(data)
def __enter__(self) -> "LanceFileWriter":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
__all__ = [
"LanceFileReader",
"LanceFileWriter",
"LanceFileMetadata",
"LanceColumnMetadata",
"LancePageMetadata",
"LanceBufferDescriptor",
"LanceFileStatistics",
"stable_version",
]