Skip to content

Commit 427206c

Browse files
steFaizwayneli-vt
andauthored
feat: distributed range-based BTree index (lance-format#5202)
This PR is about to introduce range-based BTree index. Some test results: | num of rows | num of ranges | execution time | merge time| |:---------:|:-------:|:-------:|:----------:| |130 million|3|23 min|1 s| |130 million|50|3 min|3 s| |10 billion|1000|15 min| 46 s| Please refer lance-format#5164 for more details. --------- Co-authored-by: Weiren <litaiwei.lwt@antgroup.com>
1 parent 9782e9a commit 427206c

14 files changed

Lines changed: 1260 additions & 113 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/lance-jni/Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/lance-jni/src/blocking_dataset.rs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use lance::io::{ObjectStore, ObjectStoreParams};
3939
use lance::table::format::Fragment;
4040
use lance::table::format::IndexMetadata;
4141
use lance_core::datatypes::Schema as LanceSchema;
42+
use lance_index::scalar::btree::BTreeParameters;
4243
use lance_index::scalar::lance_format::LanceIndexStore;
4344
use lance_index::DatasetIndexExt;
4445
use lance_index::{IndexParams, IndexType};
@@ -718,12 +719,13 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCreateIndex(
718719
java_dataset: JObject,
719720
columns_jobj: JObject, // List<String>
720721
index_type_code_jobj: jint,
721-
name_jobj: JObject, // Optional<String>
722-
params_jobj: JObject, // IndexParams
723-
replace_jobj: jboolean, // replace
724-
train_jobj: jboolean, // train
725-
fragments_jobj: JObject, // List<Integer>
726-
index_uuid_jobj: JObject, // String
722+
name_jobj: JObject, // Optional<String>
723+
params_jobj: JObject, // IndexParams
724+
replace_jobj: jboolean, // replace
725+
train_jobj: jboolean, // train
726+
fragments_jobj: JObject, // List<Integer>
727+
index_uuid_jobj: JObject, // String
728+
arrow_stream_addr_jobj: JObject, // Optional<Long>
727729
) {
728730
ok_or_throw_without_return!(
729731
env,
@@ -737,7 +739,8 @@ pub extern "system" fn Java_org_lance_Dataset_nativeCreateIndex(
737739
replace_jobj,
738740
train_jobj,
739741
fragments_jobj,
740-
index_uuid_jobj
742+
index_uuid_jobj,
743+
arrow_stream_addr_jobj,
741744
)
742745
);
743746
}
@@ -748,12 +751,13 @@ fn inner_create_index(
748751
java_dataset: JObject,
749752
columns_jobj: JObject, // List<String>
750753
index_type_code_jobj: jint,
751-
name_jobj: JObject, // Optional<String>
752-
params_jobj: JObject, // IndexParams
753-
replace_jobj: jboolean, // replace
754-
train_jobj: jboolean, // train
755-
fragments_jobj: JObject, // Optional<List<String>>
756-
index_uuid_jobj: JObject, // Optional<String>
754+
name_jobj: JObject, // Optional<String>
755+
params_jobj: JObject, // IndexParams
756+
replace_jobj: jboolean, // replace
757+
train_jobj: jboolean, // train
758+
fragments_jobj: JObject, // Optional<List<String>>
759+
index_uuid_jobj: JObject, // Optional<String>
760+
arrow_stream_addr_jobj: JObject, // Optional<Long>
757761
) -> Result<()> {
758762
let columns = env.get_strings(&columns_jobj)?;
759763
let index_type = IndexType::try_from(index_type_code_jobj)?;
@@ -765,6 +769,17 @@ fn inner_create_index(
765769
.get_ints_opt(&fragments_jobj)?
766770
.map(|vec| vec.into_iter().map(|i| i as u32).collect());
767771
let index_uuid = env.get_string_opt(&index_uuid_jobj)?;
772+
let arrow_stream_addr_opt = env.get_long_opt(&arrow_stream_addr_jobj)?;
773+
let batch_reader = if let Some(arrow_stream_addr) = arrow_stream_addr_opt {
774+
let stream_ptr = arrow_stream_addr as *mut FFI_ArrowArrayStream;
775+
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
776+
Some(reader)
777+
} else {
778+
None
779+
};
780+
781+
// we should skip committing index when building distributed indices.
782+
let mut skip_commit = fragment_ids.is_some();
768783

769784
// Handle scalar vs vector indices differently and get params before borrowing dataset
770785
let params_result: Result<Box<dyn IndexParams>> = match index_type {
@@ -780,8 +795,9 @@ fn inner_create_index(
780795
let (index_type_str, params_opt) = get_scalar_index_params(env, params_jobj)?;
781796
let scalar_params = lance_index::scalar::ScalarIndexParams {
782797
index_type: index_type_str,
783-
params: params_opt,
798+
params: params_opt.clone(),
784799
};
800+
skip_commit = skip_commit || should_skip_commit(index_type, &params_opt)?;
785801
Ok(Box::new(scalar_params))
786802
}
787803
IndexType::FragmentReuse | IndexType::MemWal => {
@@ -818,8 +834,6 @@ fn inner_create_index(
818834
index_builder = index_builder.name(name);
819835
}
820836

821-
let has_fragment_ids = fragment_ids.is_some();
822-
823837
if let Some(fragment_ids) = fragment_ids {
824838
index_builder = index_builder.fragments(fragment_ids);
825839
}
@@ -828,7 +842,11 @@ fn inner_create_index(
828842
index_builder = index_builder.index_uuid(index_uuid);
829843
}
830844

831-
if has_fragment_ids {
845+
if let Some(reader) = batch_reader {
846+
index_builder = index_builder.preprocessed_data(Box::new(reader));
847+
}
848+
849+
if skip_commit {
832850
RT.block_on(index_builder.execute_uncommitted())?;
833851
} else {
834852
RT.block_on(index_builder.into_future())?
@@ -837,6 +855,20 @@ fn inner_create_index(
837855
Ok(())
838856
}
839857

858+
fn should_skip_commit(index_type: IndexType, params_opt: &Option<String>) -> Result<bool> {
859+
match index_type {
860+
IndexType::BTree => {
861+
// Should defer the commit if we are building range-based BTree index
862+
if let Some(params) = params_opt {
863+
let btree_parameters = serde_json::from_str::<BTreeParameters>(params)?;
864+
return Ok(btree_parameters.range_id.is_some());
865+
}
866+
Ok(false)
867+
}
868+
_ => Ok(false),
869+
}
870+
}
871+
840872
#[no_mangle]
841873
pub extern "system" fn Java_org_lance_Dataset_innerMergeIndexMetadata<'local>(
842874
mut env: JNIEnv<'local>,

java/src/main/java/org/lance/Dataset.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,8 @@ public void createIndex(IndexOptions options) {
783783
options.isReplace(),
784784
options.isTrain(),
785785
options.getFragmentIds(),
786-
options.getIndexUUID());
786+
options.getIndexUUID(),
787+
options.getPreprocessedData().map(ArrowArrayStream::memoryAddress));
787788
}
788789
}
789790

@@ -795,7 +796,8 @@ private native void nativeCreateIndex(
795796
boolean replace,
796797
boolean train,
797798
Optional<List<Integer>> fragments,
798-
Optional<String> indexUUID);
799+
Optional<String> indexUUID,
800+
Optional<Long> arrowStreamMemoryAddress);
799801

800802
public void mergeIndexMetadata(
801803
String indexUUID, IndexType indexType, Optional<Integer> batchReadHead) {

java/src/main/java/org/lance/index/IndexOptions.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package org.lance.index;
1515

16+
import org.apache.arrow.c.ArrowArrayStream;
1617
import org.apache.arrow.util.Preconditions;
1718

1819
import java.util.List;
@@ -28,6 +29,7 @@ public class IndexOptions {
2829
private final List<String> columns;
2930
private final IndexType indexType;
3031
private final IndexParams indexParams;
32+
private final ArrowArrayStream preprocessedData;
3133

3234
private IndexOptions(
3335
String indexName,
@@ -37,7 +39,8 @@ private IndexOptions(
3739
boolean replace,
3840
boolean train,
3941
List<Integer> fragmentIds,
40-
String indexUUID) {
42+
String indexUUID,
43+
ArrowArrayStream preprocessedData) {
4144
this.replace = replace;
4245
this.train = train;
4346
this.fragmentIds = fragmentIds;
@@ -46,6 +49,7 @@ private IndexOptions(
4649
this.columns = columns;
4750
this.indexType = indexType;
4851
this.indexParams = indexParams;
52+
this.preprocessedData = preprocessedData;
4953
}
5054

5155
public Optional<String> getIndexUUID() {
@@ -80,6 +84,10 @@ public List<String> getColumns() {
8084
return columns;
8185
}
8286

87+
public Optional<ArrowArrayStream> getPreprocessedData() {
88+
return Optional.ofNullable(preprocessedData);
89+
}
90+
8391
public static Builder builder(
8492
List<String> columns, IndexType indexType, IndexParams indexParams) {
8593
return new Builder(columns, indexType, indexParams);
@@ -92,6 +100,7 @@ public static class Builder {
92100
private List<Integer> fragmentIds = null;
93101
private String indexUUID = null;
94102
private String indexName = null;
103+
private ArrowArrayStream preprocessedData = null;
95104
private final List<String> columns;
96105
private final IndexType indexType;
97106
private final IndexParams indexParams;
@@ -158,9 +167,28 @@ public Builder withIndexName(String indexName) {
158167
return this;
159168
}
160169

170+
/**
171+
* Optional preprocessed data. Some index types can consume it to avoid heavy computation e.g.
172+
* For ranged btree index, data can be ranged and sorted by distributed computing engines.
173+
*
174+
* @param preprocessedData preprocessed data.
175+
*/
176+
public Builder withPreprocessedData(ArrowArrayStream preprocessedData) {
177+
this.preprocessedData = preprocessedData;
178+
return this;
179+
}
180+
161181
public IndexOptions build() {
162182
return new IndexOptions(
163-
indexName, columns, indexType, indexParams, replace, train, fragmentIds, indexUUID);
183+
indexName,
184+
columns,
185+
indexType,
186+
indexParams,
187+
replace,
188+
train,
189+
fragmentIds,
190+
indexUUID,
191+
preprocessedData);
164192
}
165193
}
166194
}

0 commit comments

Comments
 (0)