From 59084143efb95924034bb1ed7da31f4437d433fc Mon Sep 17 00:00:00 2001 From: PSeitz Date: Wed, 19 Jun 2024 15:54:12 +0900 Subject: [PATCH] use optional index in multivalued index (#2439) * use optional index in multivalued index For mostly empty multivalued indices there was a large overhead during creation when iterating all docids. This is alleviated by placing an optional index in the multivalued index to mark documents that have values. There's some performance overhead when accessing values in a multivalued index. The accessing cost is now optional index + multivalue index. The sparse codec performs relatively bad with the binary_search when accessing data. This is reflected in the benchmarks below. This changes the format of columnar to v2, but code is added to handle the v1 formats. ``` Running benches/bench_access.rs (/home/pascal/Development/tantivy/optional_multivalues/target/release/deps/bench_access-ea323c028db88db4) multi sparse 1/13 access_values_for_doc Avg: 42.8946ms (+241.80%) Median: 42.8869ms (+244.10%) [42.7484ms .. 43.1074ms] access_first_vals Avg: 42.8022ms (+421.93%) Median: 42.7553ms (+439.84%) [42.6794ms .. 43.7404ms] multi 2x access_values_for_doc Avg: 31.1244ms (+24.17%) Median: 30.8339ms (+23.46%) [30.7192ms .. 33.6059ms] access_first_vals Avg: 24.3070ms (+70.92%) Median: 24.0966ms (+70.18%) [23.9328ms .. 26.4851ms] sparse 1/13 access_values_for_doc Avg: 42.2490ms (+0.61%) Median: 42.2346ms (+2.28%) [41.8988ms .. 43.7821ms] access_first_vals Avg: 43.6272ms (+0.23%) Median: 43.6197ms (+1.78%) [43.4920ms .. 43.9009ms] dense 1/12 access_values_for_doc Avg: 8.6184ms (+23.18%) Median: 8.6126ms (+23.78%) [8.5843ms .. 8.7527ms] access_first_vals Avg: 6.8112ms (+4.47%) Median: 6.8002ms (+4.55%) [6.7887ms .. 6.8991ms] full access_values_for_doc Avg: 9.4073ms (-5.09%) Median: 9.4023ms (-2.23%) [9.3694ms .. 9.4568ms] access_first_vals Avg: 4.9531ms (+6.24%) Median: 4.9502ms (+7.85%) [4.9423ms .. 4.9718ms] ``` ``` Running benches/bench_merge.rs (/home/pascal/Development/tantivy/optional_multivalues/target/release/deps/bench_merge-475697dfceb3639f) merge_multi 2x_and_multi 2x Avg: 20.2280ms (+34.33%) Median: 20.1829ms (+35.33%) [19.9933ms .. 20.8806ms] merge_multi sparse 1/13_and_multi sparse 1/13 Avg: 0.8961ms (-78.04%) Median: 0.8943ms (-77.61%) [0.8899ms .. 0.9272ms] merge_dense 1/12_and_dense 1/12 Avg: 0.6619ms (-1.26%) Median: 0.6616ms (+2.20%) [0.6473ms .. 0.6837ms] merge_sparse 1/13_and_sparse 1/13 Avg: 0.5508ms (-0.85%) Median: 0.5508ms (+2.80%) [0.5420ms .. 0.5634ms] merge_sparse 1/13_and_dense 1/12 Avg: 0.6046ms (-4.64%) Median: 0.6038ms (+2.80%) [0.5939ms .. 0.6296ms] merge_multi sparse 1/13_and_dense 1/12 Avg: 0.9111ms (-83.48%) Median: 0.9063ms (-83.50%) [0.9047ms .. 0.9663ms] merge_multi sparse 1/13_and_sparse 1/13 Avg: 0.8451ms (-89.49%) Median: 0.8428ms (-89.43%) [0.8411ms .. 0.8563ms] merge_multi 2x_and_dense 1/12 Avg: 10.6624ms (-4.82%) Median: 10.6568ms (-4.49%) [10.5738ms .. 10.8353ms] merge_multi 2x_and_sparse 1/13 Avg: 10.6336ms (-22.95%) Median: 10.5925ms (-22.33%) [10.5149ms .. 11.5657ms] ``` * Update columnar/src/columnar/format_version.rs Co-authored-by: Paul Masurel * Update columnar/src/column_index/mod.rs Co-authored-by: Paul Masurel --------- Co-authored-by: Paul Masurel --- columnar/benches/bench_access.rs | 73 +--- columnar/benches/bench_merge.rs | 8 +- columnar/benches/common.rs | 59 ++++ columnar/compat_tests_data/v1.columnar | Bin 32092 -> 31754 bytes columnar/compat_tests_data/v2.columnar | Bin 0 -> 42007 bytes columnar/src/column/mod.rs | 2 +- columnar/src/column/serialize.rs | 27 +- columnar/src/column_index/merge/mod.rs | 19 +- columnar/src/column_index/merge/shuffled.rs | 44 ++- columnar/src/column_index/merge/stacked.rs | 230 +++++++------ columnar/src/column_index/mod.rs | 49 ++- .../src/column_index/multivalued_index.rs | 312 ++++++++++++++++-- .../src/column_index/optional_index/mod.rs | 8 +- .../src/column_index/optional_index/set.rs | 5 +- .../optional_index/set_block/tests.rs | 42 ++- columnar/src/column_index/serialize.rs | 43 ++- columnar/src/columnar/format_version.rs | 14 +- columnar/src/columnar/merge/merge_mapping.rs | 1 - columnar/src/columnar/merge/mod.rs | 17 +- columnar/src/columnar/merge/tests.rs | 2 + columnar/src/columnar/reader/mod.rs | 13 +- columnar/src/columnar/writer/mod.rs | 44 +-- columnar/src/columnar/writer/value_index.rs | 81 +++-- columnar/src/compat_tests.rs | 144 ++++++-- columnar/src/dynamic_column.rs | 46 ++- columnar/src/iterable.rs | 9 + columnar/src/tests.rs | 79 ++++- src/indexer/index_writer.rs | 2 +- 28 files changed, 1007 insertions(+), 366 deletions(-) create mode 100644 columnar/benches/common.rs create mode 100644 columnar/compat_tests_data/v2.columnar diff --git a/columnar/benches/bench_access.rs b/columnar/benches/bench_access.rs index 571d949ec8..e40d3f179d 100644 --- a/columnar/benches/bench_access.rs +++ b/columnar/benches/bench_access.rs @@ -1,73 +1,13 @@ -use core::fmt; -use std::fmt::{Display, Formatter}; - use binggan::{black_box, InputGroup}; -use tantivy_columnar::*; +use common::*; +use tantivy_columnar::Column; -pub enum Card { - MultiSparse, - Multi, - Sparse, - Dense, - Full, -} -impl Display for Card { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - match self { - Card::MultiSparse => write!(f, "multi sparse 1/13"), - Card::Multi => write!(f, "multi 2x"), - Card::Sparse => write!(f, "sparse 1/13"), - Card::Dense => write!(f, "dense 1/12"), - Card::Full => write!(f, "full"), - } - } -} +pub mod common; const NUM_DOCS: u32 = 2_000_000; -pub fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader { - use tantivy_columnar::ColumnarWriter; - - let mut columnar_writer = ColumnarWriter::default(); - - match card { - Card::MultiSparse => { - columnar_writer.record_numerical(0, "price", 10u64); - columnar_writer.record_numerical(0, "price", 10u64); - } - _ => {} - } - - for i in 0..num_docs { - match card { - Card::MultiSparse | Card::Sparse => { - if i % 13 == 0 { - columnar_writer.record_numerical(i, "price", i as u64); - } - } - Card::Dense => { - if i % 12 == 0 { - columnar_writer.record_numerical(i, "price", i as u64); - } - } - Card::Full => { - columnar_writer.record_numerical(i, "price", i as u64); - } - Card::Multi => { - columnar_writer.record_numerical(i, "price", i as u64); - columnar_writer.record_numerical(i, "price", i as u64); - } - } - } - - let mut wrt: Vec = Vec::new(); - columnar_writer.serialize(num_docs, &mut wrt).unwrap(); - let reader = ColumnarReader::open(wrt).unwrap(); - reader -} - pub fn generate_columnar_and_open(card: Card, num_docs: u32) -> Column { - let reader = generate_columnar(card, num_docs); + let reader = generate_columnar_with_name(card, num_docs, "price"); reader.read_columns("price").unwrap()[0] .open_u64_lenient() .unwrap() @@ -116,9 +56,8 @@ fn bench_group(mut runner: InputGroup) { column.first_vals(&docs, &mut buffer); for val in buffer.iter() { - if let Some(val) = val { - sum += *val; - } + let Some(val) = val else { continue }; + sum += *val; } } diff --git a/columnar/benches/bench_merge.rs b/columnar/benches/bench_merge.rs index b30efd43ad..9e6d33d41e 100644 --- a/columnar/benches/bench_merge.rs +++ b/columnar/benches/bench_merge.rs @@ -1,7 +1,7 @@ -mod bench_access; +pub mod common; -use bench_access::{generate_columnar, Card}; use binggan::{black_box, BenchRunner}; +use common::{generate_columnar_with_name, Card}; use tantivy_columnar::*; const NUM_DOCS: u32 = 100_000; @@ -13,8 +13,8 @@ fn main() { inputs.push(( format!("merge_{card1}_and_{card2}"), vec![ - generate_columnar(card1, NUM_DOCS), - generate_columnar(card2, NUM_DOCS), + generate_columnar_with_name(card1, NUM_DOCS, "price"), + generate_columnar_with_name(card2, NUM_DOCS, "price"), ], )); }; diff --git a/columnar/benches/common.rs b/columnar/benches/common.rs new file mode 100644 index 0000000000..b087b0ac81 --- /dev/null +++ b/columnar/benches/common.rs @@ -0,0 +1,59 @@ +extern crate tantivy_columnar; + +use core::fmt; +use std::fmt::{Display, Formatter}; + +use tantivy_columnar::{ColumnarReader, ColumnarWriter}; + +pub enum Card { + MultiSparse, + Multi, + Sparse, + Dense, + Full, +} +impl Display for Card { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + Card::MultiSparse => write!(f, "multi sparse 1/13"), + Card::Multi => write!(f, "multi 2x"), + Card::Sparse => write!(f, "sparse 1/13"), + Card::Dense => write!(f, "dense 1/12"), + Card::Full => write!(f, "full"), + } + } +} +pub fn generate_columnar_with_name(card: Card, num_docs: u32, column_name: &str) -> ColumnarReader { + let mut columnar_writer = ColumnarWriter::default(); + + if let Card::MultiSparse = card { + columnar_writer.record_numerical(0, column_name, 10u64); + columnar_writer.record_numerical(0, column_name, 10u64); + } + + for i in 0..num_docs { + match card { + Card::MultiSparse | Card::Sparse => { + if i % 13 == 0 { + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + Card::Dense => { + if i % 12 == 0 { + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + Card::Full => { + columnar_writer.record_numerical(i, column_name, i as u64); + } + Card::Multi => { + columnar_writer.record_numerical(i, column_name, i as u64); + columnar_writer.record_numerical(i, column_name, i as u64); + } + } + } + + let mut wrt: Vec = Vec::new(); + columnar_writer.serialize(num_docs, &mut wrt).unwrap(); + ColumnarReader::open(wrt).unwrap() +} diff --git a/columnar/compat_tests_data/v1.columnar b/columnar/compat_tests_data/v1.columnar index 512b4394e00228b0ace57f59879ef59d02c0d70a..58e7b9efbf424af5d00e78d60ccc5c211c2aded2 100644 GIT binary patch literal 31754 zcmeI)aa|`u!faKlXVI zefr>ip5I4*^dRNz*>k(wGRk91zbVRvv-I0qCHzu(wsQWjod5a(<-h-~@>sDVq6`s7 zs0a_}lp3Of*Sgo@A*I>JDh2n%5&9E6MT z5I!P-2oTCLb}VDZGIlIu$1-*x;s_O?A#{X+FcB8QMmPu;;URoP0udmT<>lcwy>>Y} zma}6yJC?Hp5l5&94WT0pgo&^aHo`%;2oK>S5{Lkyq_QKG9jWX{Wk)JI5OIWx&=5Mp zK$r*%VIv%bi|`OWB7q1H$_jR@XbZpT>=o=-!HyN|K*SL$LPO{X17RX8gpF_zF2Y0j zhy)@)C@a~qk{v7Av63At*@1{7RD_1m5eC9USO^>8AY6op@DT|_fKXPkV--7Av11iG zRSOi5OIWx&=5MpK$r*%VIv%bi|`OWB7q1H zN*X)T*pbGLG;#w*^$nUbao)(2o<3rbcBI05f;KmI0zTv zA$&vv5g?Sc>{!c=wd`2Sj)5f59qZV! zjvedRfrukigoe-&2Es&G2pi!bT!e@45eYdUB0wk`*|Cuw8`-gu z9UIw!h$B>lhR_iP!bDgI8{r^agop4E2}FQUHnC$9J2tUn6FWAs0})542o0el41|fW z5H`X=xCjs7BNB)Jp=7cnlO37t$Ye(*I}mY%iqH@`!a$e^3t=N1gp2SHJ|ck#5XxqD zY-Y!1c5G(HW_BRr2o<3rbcBI05f;KmI0zTvA$&vv5g?Q;?AXGNE$rCBjxFp!#1Se& zL+A(tVInMqjc^by!bA9o1R_8vTiLOd9b4J4l^t8zfrukigoe-&2Es&G2pi!bT!e@4 z5eYJDh2n%5&9E6MT5I!P-2oOp(JF?l4&5mq#WU~ViN2mx5p(6~0iLekh z!a=wQ58)#chybB%XUBGSY-h)Ic5G({B92fI8bU`H2oqr;Y=nbw5gx)vBoF~Y$zewh zJ95~O!;TzwAmRuWp&@jHfiMvk!bUg<7vUj%L;?{YlpXBY!Hym5*ujn+>_Eg3Dndi( z2m@gvEQF145H7+)_=p4|Kqx!ev6CG;*|C!yJK2GVBUFTj&=Cg0L|6zL;UHXuhwu>z zM1W9sv11oIcClj@J9e=H5l5&94WT0pgo&^aHo`%;2oK>S5{LkyPZ?5CKBj!;U@t``E*dJ?z*c|G+Xt9HAmKgpM!}Cc;A42nXRJJcN%( zAOeK4w>PZ?5CK9t$c}^TI2a4R=>rGZagZH|I6_5e2pwS{OoWB7 z5e~vdcnBYnKm-Wo5IYXB;}AOzvEvXs5OIWx&=5MpK$r*%VIv%bi|`OWB7q1H%3*dK zX2)T69A?L1b|B&i6`>(?gn=*-7Q#k22p8cYd_)2fAd~`j6tJU!9R=(tUCIKmD@9HAmKgpM!}Cc;A42nXRJJcN%( zAOeJPlpROeag-fL*>RK|h&Vz;Xb2r)AWVdXun`WzMR*7wkw63pPZ?5CK9dVn-1> zir7)ajv{s-;s_O?A#{X+FcB8QMmPu;;URoP0udmT6YMy_juYXI)B`6JrRYS8vP_X* zx8GUx)QP9g|Ebfs@U>2*ExhgF?FesYc)P;e9p0Ys_J+4Fy#3)F2=8Eco$wBYcR0Lm zct^rJ8s4$+j)!+5yp!Rb3h#7yXOzd5DlLEMxBu9A`Mv#{d2#I&=1}nk$p~^|Q!F&o7B55WPQ1JiH|u zJ-_JrCG!NL_a~XB7yb7ay+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_W zlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{ z-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)Ti zUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4By+6_Wlgtx{-k)TiUiA4B zy+8lA-=B-#`LeUO-DRX)ILaO2Z42*v!`mL-_lLJ5ymyDUGraeNw=2B&hPOMsR(N~D zdtZ2a!+U>t`@;J`c>BZqx$q8z_lw~j4DW;Cb;A2lc!$FKjqnbK_uJuh!}~~hN5cDP zct^wg!|;xU_s8KK5AWmQoe1v};hhYxAKt0({yMzV;e9f^GvR$IytCnbI=pk?{d0Kd z!~1M_gYZ5V-i7c!AKu0A{yV(N?O`mFNR?PA5tB%hNS9bAks+~BB2!|EM3zLhM2^Hx ziCl?25_uB)CGsT>NfbyNl_-=bk|>tASfWJY5{XiYlM-bTm z@(GeEu~H%?ktUHYu}&gGVxvT+#1@GxiEN1+iJcO;5_=@_B=$?>OB|9YkT@z)C{ZL) zEOD_!iNqxmr4lD4$|TAqDkLtIsFb)&qDta&iE4=}Bx)qClBktXCF&%umZ+DwMxsID zI*CS!8zh<}PDwONoR-ifZkA|~xK*N6;*5lHPCkDmQYBVO#3a%r(k0eOWJqk3$duS3 zktLBWkt4BFB3ELMM4rTciF}De5(N@RB?={qB#I?2mMD?9M50vUq(qrSxkQD;r4p4A zmq}DfTrN>9afL*U#8nct5~@U<#MKh@64yvHNL(k;C~<>Clf)^BW{J}hy2Q;AEfTj% zv@XTTGq>HPoH>{Ba7l-%_RZ|8+&{2C7|J4)wEYhF6^(wvjV$(fUtFB^DS zMwwnVSyuY;-j}DAHRg)js@7LczpA3LzcT&R z&99z#b;)HtmnpAlc+Kc*3aUD*f~wm1a6I?&w##QPuX^p^Ycs1`s;8>UUf1`!*cDAz zj9*dw`tH{+zP`R@q$dB$jw|P{ths9Fs+>0%ZMReH4ZjrUf**4)b(XI^xY79d(+#;-(K9*-L%+L|BjJ&=_+%ZaC%pB z*F;yzM|wV@+}m*P=)DCW?fhu)(c13e?%a>HeQfq)Ro0-D`SF&IPkp?sr>`gWiKb7C zf1>!l?)w(+tN-N4C-Zwddgps8u=dNtU(S87?ZMdxtG+V$l}x9_nR3d$+V|DiLro8jKUDm+?yoI=t$t`^DF5pn zU!VVa%{PX=k@HRCn={|693B|X_?G^y$#0c@yZ76v4>vwM_Hd!w}wmVCeG`^sp;=;&y{4?2Gk{Gj%S!#~XZQQMDZ ze^fO#IF{+PcvD{4kNbWcd#vfP@yCkCyT=#D>whxxll;d!9-n`_=BGnH&6zMJW+p0s zHt@5IC-f&KpD6u#@6S^w8z;vm3xCn|i-lj*`L3V$%l2Q+{jz$>nacWA>#wGNRq^Zo zU#I`3`8N~4DVgq>R({*?+tJ?^JlXkV@MP`phJTkk(>60ZQ}z47-)BD6^3>E*Wr@B- zY_@53e75)x-G5m8L;ceuPv`%!r+jzm_lkULE!eOW($YA5HhJTgv|n0dw5L literal 32092 zcmeI)aa7)A`N#2l6BQRtR8(9v9~BiREu5&hXx>g#RGK)MqSCybI8kxpL`B8zp2MU< zB_pFkyE8H>R5CIu$~z;Y#3Cc3LcKLIGAc4MGRoMxulMQqZ_i)9bH3j{p3iH~4xh7g zo$Gq|XXjb8ef!Q0@li^YLK+!lkwYE@q!!VKG&0B{hdc^MEv64?WROJ;c@&V^gFd8@ zK^8gWQ9vq#KBSRB7CGcmKx$9=kVXbs%2A)Z6Go8X07fLmmaBvgkt^8Dx<| z9tEV{P9M_9Ad4LGC?K_zKBSRB7CGcmKx$w5kVXbs(`uIpk45>OlICMh02rkVgTj zgEqt`g)}nAB8NN*NF7Wc(#Rl-9P%h2l}jJe$RLXx@+cs62z^K+gDi5$qkz;h`jAEj zS>%vM0jYP;hcq(CB8NN*NafLoG&0B{hdc^My^}tqkwF$Y%2A)Uos-jSRBLA&&x5h4dkf46?`}j{;Kf zq7P|gkVOu86p&g;AJWJmiyZPOAoXtgkVXbsGKAJWJmiyZPOAXQ8s(#Rl-9P%h2bpm}zBZDk*$fJPNi5udR zLK+!lkwYE@q)ws_X=IQ^4tW%iDxnW)WROJ;c@&U3nLebEK^8gWQ9x=neMlpNEON-B zfYf{FLmC-mkwYE@q)O>S8X07fLmmaB-b)|S$RLXx@+cs+hCZZ`K^8gWQ9$Yx`jAEj zS>%vM0Vz!%(#Rl-9P%h2bt-*GBZDk*$fJPNTKbSi23h2gM**qR=tCMAWRXK21*FR8 zLmC-mkwYE@q)w*~X=IQ^4tW%iI)gr>kwF$Y%vM z0jaa-LmC-mkwYE@q|T-fX=IQ^4tW%iI)^@_kwF$Yqz`FikVOu86p*T>4{2nOMGkoskh+LIq>(`uIpk45>SFqkMh02rkVgTj zOXx!y8Dx<|9tEUo=tCMAWRXK21*G0jAJWJmiyZPOAhnS`q>(`uIpk45>QefUMh02r zkVgS2Lm$$}Ad4LGC?ItieMlpNEON-BfYc`XkVXbs%2AR2_XtBZDk*$fJPN zRrDc^46?`}j{;Ix(}y%N$RdY43P@c;AJWJmiyZPOAXQHv(#Rl-9P%h2buE2JBZDk* z$fJPN7W$Az23h2gM**qp=tCMAWRXK21*972LmC-mkwYE@q^_qAX=IQ^4tW%i+DaeN z$RLXx@+cs61ARy%gDi5$qkvQ+eMlpNEON-BfYgojA&m^O$RUpcQa5dgPYP*dkVOu8 z6p*@^KBSRB7CGcmK&pv8q>(`uIpk45>K6KtMh02rkVgTjTj@g@8Dx<|9tEUsqYr6h zkVOu86p(7B4{2nOMGkoskh+~dq>(`uIpk45Y8!n>BZDk*$fJPNcKVP;23h2gM`62C zZ*5*D9Wgs&cExltyJPmm?2XwMvp?oQ z%)yvG=1|Pxm?JSqV~)ifk2w)D#GH&d6>~b~Ow8Gsb1~;*s)b1`Jnbq&kSWL#WD9Zx zxq>`FN{}xo5EKfE1jT|9L8(9s$^_+t3PGiyN>DAR5g0+OpiWRPXb?0Cngq=|)S(L- zv!q~QHvhFbJGO1#x$r+qcTddjdR-V?vN`FN{}xo5EKfE1jT|9L8(9s$^_+t3PGiyN>DAR5g0+OpiWRPXb?0Cngq=P zwfl^A%}0f$Sfa&??82|Ag}>PS5#y{{ynD5Fz3kZaNwjVIT~+cc@_);p zB|rba4KDsvGQYIOw;`EdGQV^tknB%7)7y~TU-J7etp!T%FRk$<_m|vXS__oiUs~f! z?k~B&v=%72zqH1e++T8kX)RE4e`$>`xxeK8(psS8{?Zy>a(~JFrL{oG{iQX&%kke|~=_TzAU$+wZcAc89tnW=qTu#B7cEp_pwk?}^zS^S+oJ zG4GGr8S|4dyJ9{V)5Ux!W_QeoWA?=SV$9x{kH+ka`FPC!m`}tUi203}gE7Ay)5m-& z=1|P1V-CmsVa$=3KZ-dT^SPK~F`th)9`lzmCt|)BGsJu)=48xQV@}0jhPU3k21Iiv=};jRGUsB&Zc^7Ssu@7Ss#22pR-i1&xB6 z1Wkfl1Oa6fflS4lnKrdlnc%lR0!4yDh2BWRe}oy)q;x!HG+)- zBiJOU6>JvN39c5@3$_Rv1X~4-f|~?Qf?Ea63s|%5_Pf-!or|8#RBFG4uQOV-`)iJB z)vzjD)V^qTQT5`%#i>17_L$nEBBL)OXV0cRC-&5Pb?=qAcm3X@dlzSRX3l4teTMca zSkks+W=YlC2HuvJWwR!;%HQ7m_UxsNOUIX%?(6o=$gayC$u8QjW52omYWDa0=O55| z!1Mu?IsG}g2R0uV4lFyU=b)^E8x9^jxFokLR~=G&$nYVB%i5RCE~|dW;5$-zEqPOU z74PhOXU_7bd|zr^%etv`73cPyn^W0TIZ>(4>pm}Yef|2;^~L9RowB-y-rBfzd~4|q z?uLxUy2g>lq8mGIoV&5+CVx}@&8;_2-(1<$-;{ex^DW_)vRiv@&AP4Owz1nvn!B3S z?X|ZL-(I+_ecSA|>g|KuQ?|uU*@_)~J96%5x?|!Fy|a5~W=nm`XiM>(op;XPX+ALY zfr7i*?wYx)s&$|>?}PS(lOHVqQ16GbKiv4?@eh}_xweeE>+T-8yXc;dd*<$`x!2#D z-`?6j-Cp^T{*UC|*L+{Nuk52eAI<7$=osrL`B>M-)cv*h58q$-@%E3;e!RMKuru|E zmQPH5qT-W%pUnAG)2AjrrMtSjG9RdaVDy3F2Rk2}f6#n-=+gzR&CR%~&kTGf?;-op zcj@Qc=Q19ydwAsGqR)4He(v)%J-#RZ3$0(6{zBy!`@fj`rRFb% zFO~K7^kzNM@W|LBC69JJsvfI-Z1}OlzV^P^zUnUzemV7c%i~jzSA3=KD>?m5{S*EA ztKDDCe4_q|(I<+(*7>#hubF|Nfr78MeSPNZRo@u+M&38=Hz&VYKG-{${jJ7tjeo23 z+wR*LPu4v-@???k@N>TAJN`TQPqjWZ{Z!?5`@fqz)I1c1%D&h0y{xAjo*sL;&8dMi+<7Zi@9Ia z{L=q2|5vTQn*LSgME^wY3(YTt7s_7jc`@syhL^@(DhXXdydb1+wR|HPSsD1P8I*I^LO*VGp`N3R`C0_-_QKMYI~>xGPriT>viFq^jg%u3%O!Xr3!bwv8VcO)ti~GePq#3 zmNe8}vE_yzD@juO!=$AGumXrU^$zNY3|8KRSq+YyeqKHw% zDH0R`g;FFbQWR;5424l-DRLBfijYEzxTA5u=DxBq#z3rAShwDAE)e3ZuwU|GB1w^=NK<4ej3P^sqsUW)6jIC`#oSTM9mU*H z%pDXlia14rBA`%;Bt?oMO_8B6iY!HrB2N)g$Q16FG8p})7f<1iDcmuIJ1Allaf$>* zK%o>#iWEhfB12&mS&AG*o+6}>soXJ@JEn5SRPLC{9TYK&I7NaYpiqh=MT#O#k)be( zEJcnYPZ3hcH13$j9n-jD8h1?N4vH8>oFYLHP$)%`B1Ms=$WRzXmLf-yrwA#eggZ*O zql7z3xWj*`tcfB<5vNE{1QbeIM6fuf8MS>!rP>Li)iXu&sp)iUpMUEm*5mLxZ?wH9PGr40X zcg*AtiWo(lB0&*QC`FPYMUke+P#8s)B1e&@2q|P1cg*6BS==#;J7#eQMT{a&k)Q}D zlp;xyqDWI@D2yUYk)y~{gcLHHJ7#moZ0?xN9kaQEB1RFXNKgb6N|B^UQKTs{6h@Jy z$Wi1eLJIz;{L-0a+)>6IW!zE59sWy8V-#_U1Vuoh6iJE{MVcZ*VH8=497UcYq>wrM z=P`#n=5WUx{|na7;SP!zMVul*5l|>ak|ITsrpQnjMV2B*k*5eLWG;8i<&L@BF_$~$ zatB3>B2JN@2q=^yNs*#RQ)DQNB1@5@$Ww$AGLJjvamPIFn8zLSxPu}_5vNE{1Qbe< zq)1VuDKZpBk)_B{oFYLHP$)%`B1Ms=$WRzXmLf-y zrwA!zK6lLLj``d%pF8Gr2Stn`PLZGpD3l^ek)lXbWGIXxOOd0fQzR$? z3Z+O=q$tu9849DwQsgM|6d{FFa7P7qRB%THcT{i(MT{a&k)Q}Dlp;xyqDWI@D2yUY zk)y~{gcP!fI~H-rBJNnk9gDbwB1RFXNKgb6N|B^UQKTs{6h@Jy$Wi1eLJC>T9gDeR zF?THHj>X(T5u=DxBq#z3rAShwDAE)e3ZuwUD+NTcbv{06fuf8 zMS>!rP>Li)iXu&sp)iUpMUEm*5mHDccT{pmC3jSEMS9ZR@_B1RFXNKgb6N|B^UQKTs{6h@Jy$Wi1eLJB#PJI>^eGr8kT?l_Y> zC}I?GiUdVKp%h7q6h)dMLtzwIiX26rBBYQi?x^C9D(*K%o>#iWEhfB12&m zS&AG*o+6}>W!$lh|2&p)$1?6%=6}H^iWo(lB0&*QC`FPYMUke+P#8s)B1e&@2q|Rw zhUhmvb2)b`=Z@vvv79?7Via+T1Vuoh6iJE{MVcZ*VH8=497UcYq>yUvsOFAp?x^OD zYVM$jQN$?{6aj@&Bq>r9X^ISmQDiA{6nTn}LRN6cioxhNy?6z8tl*9n+(8keh*Kme z0t%%_Qlu!-6d4Mm$Wr7e@)RM3tmKZB+_927R&vKm?x2WK#3>RK0fkZ|DN+<^iVTHO zWGQkKd5VxiR&mEF?pVbgtGHtocTmJA;uHyrfI=yf6e)@{MTWvCvJ^RrJVi($HQZ6d z9W~rh!yPr;K@p>fQzR$?3Z+O=q$tu9849DwQsgM|6d{GI=8o0evAQJsO|M$b9jm#6 zB1RFXNKgb6N|B^UQKTs{6h@Jy$Wi1eLJC>K9c#E_4R@^Jjy2pt5u=DxBq#z3rAShw zDAE)e3ZuwU!rP>Li)iXu&sp)iUpMUEm* z5mHDkchqu6EqBy%M=f_y#3*bbGYLi?l^}#C}I?GiUdVKp%h7q6h)dMLtzwI ziX26rBBYQy?x^FAI_{|Bjymq3h*88T5)=W2QY0x-6lsbKg;8WFauj)rkV4Moj&r%= zT<$oRJI>_}iWo(lB0&*QC`FPYMUke+P#8s)B1e&@2q|P8cdX-%b=8W#M?BpXuax{96 zMQ<3r$D{W|^vdlKiXA16Qb(Di+)?4EbW}O29W{9N2#OCQSPX4R6430)s7lRt)tFS?`Uu|I+`5K zjuuC&qs`Io=x}s8x*Xk(9!IaE&k;EK9Rm(I^kLN42BI zQR}F4)H@m+jgBTqv!lh)>S%McJ31VljxI;HqsP(f=yL>)e#gL_^44OJqVX@!r12j$ zFGhcexSEh6LsG;JDKaFJ*daxRWHLLX$dDAXLy8Q^6n03FA(_e!DKaF}*daxRq=X$( zWJsp7Ly8Q^40cG7Avuj5Qe;R<*&#)SWF|YL$dJrphZGr-+3b)aLsG^LDKaE;*daxR zWG*|T$dJrqhZGr-a&}0OA(_t(DKaDr*daxRWFb4G$dFX9Ly8Q^B6diTAz920DKaFd zvqOpuNhLd^$dH`D4kraG9<5HhZGr-4eXF2L-I;?NRc6FW``6Rl2@@qiVVrc?2sZu@@jTSks)behZGr- z*RVs149O+zkRn6!T6Rd0A!%iY6d95jJEX{vT*?k9G9<5KhZGr-Hg-snA$dJJq{xt5 z#ttbmByV7c6d96sc1V#SxttwRWJs=HhZGr-H?l*D3`qw&q{xuGi5*g8NaE~}B17_K zc1V#S>12l#8IrfKLy8Q^mF$orL-JO3NRc7wVuutNlDDx#iVVqB?2sZu@^*Geks;}3 zhZGr-tJxt%hU6M{NRc5)utSOrNe?@u$dFvi4kJEX{ve4HIpWJpr%kRn6! z33f=4A=%CjDKaFVWQP4?2sZuGRzJsG9;g7hZGr-2iPG+hU7EskRn5} zgB?<2NIuIBDKaDvvO|gtNtzu}WJq?hLy8Q^=hz`dhUD|?kRn6!1$Ic0A=$+aDKaEq zWQPE^B15v99a3aSzQGPDG9=$*hZGr-Z?Qv)49OmLNRc7=Han!qkbH+7Qe;RT zVTTkM5@UxH8Itd^Ly8Q^qwJ6(L-IX#NRc7g%MK|rB;RL;6d97o*daxR49R|WNRc7QvO|gt$xqlJMTX>Yc1V#SIlvAnG9*7` zhZGr-C)goHhU91LkRn5JkR4KFNPf-^DKaEavO|gt$uHO;MTX=MJEX{v{E{6~WJq%C zkRn6!D|SecAvw$rDKaF#W``6RlHagHiVVqb*&#)Sgc1V#S`3F0s$dDXohZGr-f3ic049N@ZkRn6!FLp?gAvwVgDKaGgW``6RlK-$n ziVTU*UTi|@+GH=v_+O|LTa!2$IM(_KgcXRZ9AbPh%?>*5w6utLG@7CzOFM78{Zz_7XNALa7I~=_aMDLF1 zeK2}=M(^jNcUSa&DSAhu_o3*`MDN4VyE}Tn8NGX=_dC&R(ferh?v38ZqIX~P{wR9) zNAFLf_dxVM5xoba_sQrz6ur6VJsiEiiQXg8`*ie>L_u{aFjY`Im#S!9p#P%jta+ON2Ozlqsp<& zQSDgisBx@u)H>EW>KyAF^^Wr$4UP*OjgAW)O^ywYX2->j7RM!yR>!4|HpgX-cE=Tt z4oBS4>A2F-<+#ex?YPF#$u+0=eWTUIBs_IJ8pFhI5s-?!7<5yf)qQZI!YWf z9Hov~jxxtwN4aBxqr$P+QR!IXsB$cGR6ABWY8-1EwT`upI>$Okz2kgGgX026qvJwH zlVgLU*>SO>#c_$F)p4n#&2gEd-EoDZ!x49MI<9ndIj(YaJFao`IIeT_I<9y0Ic{(S zj+-6*j$0iAj*Skv(|`UriXBrOC5{=6QpYSunPaY_+_At>;aKdbbS!aHIhHx99V;C* zjx~;2$6807W1XYkalWI$ae<@JaiOEhvBA;oxY*I+xWv)wxYW_+xXjV+xWdulh&wtR zS30^JS2?;J*Eo6{*ExC}*E{+gH#h>v&5nM@t&V|lT)6SJyJh2@MURw-EE)d?8x@`U zM;&#`KB+AlDhi7_CheJ2HFc_x{ZkH4X_~rYYRR`9d(pl{wTp)qhl@K--*b9ZAd7)l7dSS~&yDlng3YrczHNIl_6~!BR zHtgR}_sXrWJpRhgW^1l~)#g_neO24VnTso4J@D!yuWoMH*;4wNzSkUlO~WPIFOk=F zzjohiYg>m}!`6=2o>qmt59+*@4UI->~fsC*IK2 zzPG*R@-3GiyS)91-B(n;QQtWF#+Hs<9c6C{-gM|qjq%}l@tb?zy#LL0om)GPcXqzT z-co(#<|~h0+4k1VTPwN-x{h=;zisE+O0Vj>>flukZ{PlQ>F(~{*Ij${(ADA99oOu+ zrYbR*7)!MFjP#UW+kfrhYn!gyab3wfdf##29reB2dQbFry>st7Yp&mN{juxY-?jT) zm3`Vb+Sl^#UGFZtA-Lht4UIPr-&h><1p9-!o3`F`{HD&E?dIxRHs5membU&(f5m$S z-gD$V&A0Bnwe-Dx?>+e5hJoz^a$EOp`);eSqloQY-cp?GN$yY9-Lv(cD))8Yx9`5%j}3h+{8-1fJ=>~2KKSvmkGG~qQstlM|HR=>G;QCpz2uXyy2lRo_2Uoa9Lf*pr;G=6sYv&9egJh=bC zy7boc@pR`-+gbg&&7V8^xwg+|K40;LfiE2SLi4VjyGp;<_r-%>Z1~dlFUgm?zr63u zwIf3#;Yi0<_I#!4p}~j79%}vS$XClV{h7m=rmyYzTFJw`4}|?(Qmcv*|n$a+rhUFeY^2H!`~@>r00?SkJQ;#J8qrdweMCx zy7|$gkG6d;^Sz3_1AC9`ZT|kw@0UK-_t?S58h)_-2ePkw-@bjdKOFjD_`{AL?fFsF zj|YD|_T$$5Bm2v<{n^9Wrl0KiNy+2Aj~{rv{=l{aCk}M|bnj1Vp4jrlu_xMpw)nWp@Xe92hv*nzS7KWzKMi9dAxaqk~%{UT8~(ZdpYlTY3;SNE{nyaH!hdy~*mI)l--G`i`*-VqM*dU&V*iVW$N#Z#ADZ<4 z{XcMz&c3Vcu9N>Txc~b9rJgFsXh7YQ{>s#zfkgzr;2_$qp$nw>u Column { .map(|value_row_id: RowId| self.values.get_val(value_row_id)) } - /// Get the docids of values which are in the provided value range. + /// Get the docids of values which are in the provided value and docid range. #[inline] pub fn get_docids_for_value_range( &self, diff --git a/columnar/src/column/serialize.rs b/columnar/src/column/serialize.rs index 4198487bb5..73fc5e7f51 100644 --- a/columnar/src/column/serialize.rs +++ b/columnar/src/column/serialize.rs @@ -12,7 +12,7 @@ use crate::column_values::{ CodecType, MonotonicallyMappableToU128, MonotonicallyMappableToU64, }; use crate::iterable::Iterable; -use crate::StrColumn; +use crate::{StrColumn, Version}; pub fn serialize_column_mappable_to_u128( column_index: SerializableColumnIndex<'_>, @@ -40,7 +40,10 @@ pub fn serialize_column_mappable_to_u64( Ok(()) } -pub fn open_column_u64(bytes: OwnedBytes) -> io::Result> { +pub fn open_column_u64( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result> { let (body, column_index_num_bytes_payload) = bytes.rsplit(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload @@ -49,7 +52,7 @@ pub fn open_column_u64(bytes: OwnedBytes) -> io:: .unwrap(), ); let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); - let column_index = crate::column_index::open_column_index(column_index_data)?; + let column_index = crate::column_index::open_column_index(column_index_data, format_version)?; let column_values = load_u64_based_column_values(column_values_data)?; Ok(Column { index: column_index, @@ -59,6 +62,7 @@ pub fn open_column_u64(bytes: OwnedBytes) -> io:: pub fn open_column_u128( bytes: OwnedBytes, + format_version: Version, ) -> io::Result> { let (body, column_index_num_bytes_payload) = bytes.rsplit(4); let column_index_num_bytes = u32::from_le_bytes( @@ -68,7 +72,7 @@ pub fn open_column_u128( .unwrap(), ); let (column_index_data, column_values_data) = body.split(column_index_num_bytes as usize); - let column_index = crate::column_index::open_column_index(column_index_data)?; + let column_index = crate::column_index::open_column_index(column_index_data, format_version)?; let column_values = crate::column_values::open_u128_mapped(column_values_data)?; Ok(Column { index: column_index, @@ -79,7 +83,10 @@ pub fn open_column_u128( /// Open the column as u64. /// /// See [`open_u128_as_compact_u64`] for more details. -pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result> { +pub fn open_column_u128_as_compact_u64( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result> { let (body, column_index_num_bytes_payload) = bytes.rsplit(4); let column_index_num_bytes = u32::from_le_bytes( column_index_num_bytes_payload @@ -88,7 +95,7 @@ pub fn open_column_u128_as_compact_u64(bytes: OwnedBytes) -> io::Result io::Result io::Result { +pub fn open_column_bytes(data: OwnedBytes, format_version: Version) -> io::Result { let (body, dictionary_len_bytes) = data.rsplit(4); let dictionary_len = u32::from_le_bytes(dictionary_len_bytes.as_slice().try_into().unwrap()); let (dictionary_bytes, column_bytes) = body.split(dictionary_len as usize); let dictionary = Arc::new(Dictionary::from_bytes(dictionary_bytes)?); - let term_ord_column = crate::column::open_column_u64::(column_bytes)?; + let term_ord_column = crate::column::open_column_u64::(column_bytes, format_version)?; Ok(BytesColumn { dictionary, term_ord_column, }) } -pub fn open_column_str(data: OwnedBytes) -> io::Result { - let bytes_column = open_column_bytes(data)?; +pub fn open_column_str(data: OwnedBytes, format_version: Version) -> io::Result { + let bytes_column = open_column_bytes(data, format_version)?; Ok(StrColumn::wrap(bytes_column)) } diff --git a/columnar/src/column_index/merge/mod.rs b/columnar/src/column_index/merge/mod.rs index 1aec9f71c3..3c4c3df4fa 100644 --- a/columnar/src/column_index/merge/mod.rs +++ b/columnar/src/column_index/merge/mod.rs @@ -95,8 +95,12 @@ pub fn merge_column_index<'a>( #[cfg(test)] mod tests { + use common::OwnedBytes; + use crate::column_index::merge::detect_cardinality; - use crate::column_index::multivalued_index::MultiValueIndex; + use crate::column_index::multivalued_index::{ + open_multivalued_index, serialize_multivalued_index, MultiValueIndex, + }; use crate::column_index::{merge_column_index, OptionalIndex, SerializableColumnIndex}; use crate::{ Cardinality, ColumnIndex, MergeRowOrder, RowAddr, RowId, ShuffleMergeOrder, StackMergeOrder, @@ -171,7 +175,11 @@ mod tests { let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { panic!("Excpected a multivalued index") }; - let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); + let mut output = Vec::new(); + serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); + let multivalue = + open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5]); } @@ -200,11 +208,16 @@ mod tests { ], ) .into(); + let merged_column_index = merge_column_index(&column_indexes[..], &merge_row_order); let SerializableColumnIndex::Multivalued(start_index_iterable) = merged_column_index else { panic!("Excpected a multivalued index") }; - let start_indexes: Vec = start_index_iterable.boxed_iter().collect(); + let mut output = Vec::new(); + serialize_multivalued_index(&start_index_iterable, &mut output).unwrap(); + let multivalue = + open_multivalued_index(OwnedBytes::new(output), crate::Version::V2).unwrap(); + let start_indexes: Vec = multivalue.get_start_index_column().iter().collect(); assert_eq!(&start_indexes, &[0, 3, 5, 6]); } } diff --git a/columnar/src/column_index/merge/shuffled.rs b/columnar/src/column_index/merge/shuffled.rs index f93b896354..9a985b4b97 100644 --- a/columnar/src/column_index/merge/shuffled.rs +++ b/columnar/src/column_index/merge/shuffled.rs @@ -1,6 +1,8 @@ use std::iter; -use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_index::{ + SerializableColumnIndex, SerializableMultivalueIndex, SerializableOptionalIndex, Set, +}; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, ShuffleMergeOrder}; @@ -14,15 +16,24 @@ pub fn merge_column_index_shuffled<'a>( Cardinality::Optional => { let non_null_row_ids = merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows: shuffle_merge_order.num_rows(), - } + }) } Cardinality::Multivalued => { - let multivalue_start_index = - merge_column_index_shuffled_multivalued(column_indexes, shuffle_merge_order); - SerializableColumnIndex::Multivalued(multivalue_start_index) + let non_null_row_ids = + merge_column_index_shuffled_optional(column_indexes, shuffle_merge_order); + SerializableColumnIndex::Multivalued(SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids, + num_rows: shuffle_merge_order.num_rows(), + }, + start_offsets: merge_column_index_shuffled_multivalued( + column_indexes, + shuffle_merge_order, + ), + }) } } } @@ -102,11 +113,18 @@ fn iter_num_values<'a>( /// Transforms an iterator containing the number of vals per row (with `num_rows` elements) /// into a `start_offset` iterator starting at 0 and (with `num_rows + 1` element) +/// +/// This will filter values with 0 values as these are covered by the optional index in the +/// multivalue index. fn integrate_num_vals(num_vals: impl Iterator) -> impl Iterator { - iter::once(0u32).chain(num_vals.scan(0, |state, num_vals| { - *state += num_vals; - Some(*state) - })) + iter::once(0u32).chain( + num_vals + .filter(|num_vals| *num_vals != 0) + .scan(0, |state, num_vals| { + *state += num_vals; + Some(*state) + }), + ) } impl<'a> Iterable for ShuffledMultivaluedIndex<'a> { @@ -134,7 +152,7 @@ mod tests { #[test] fn test_integrate_num_vals_several() { - assert!(integrate_num_vals([3, 0, 10, 20].into_iter()).eq([0, 3, 3, 13, 33].into_iter())); + assert!(integrate_num_vals([3, 0, 10, 20].into_iter()).eq([0, 3, 13, 33].into_iter())); } #[test] @@ -157,10 +175,10 @@ mod tests { Cardinality::Optional, &shuffle_merge_order, ); - let SerializableColumnIndex::Optional { + let SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows, - } = serializable_index + }) = serializable_index else { panic!() }; diff --git a/columnar/src/column_index/merge/stacked.rs b/columnar/src/column_index/merge/stacked.rs index ba91b8d64c..9c0890abe8 100644 --- a/columnar/src/column_index/merge/stacked.rs +++ b/columnar/src/column_index/merge/stacked.rs @@ -1,6 +1,8 @@ -use std::iter; +use std::ops::Range; -use crate::column_index::{SerializableColumnIndex, Set}; +use crate::column_index::multivalued_index::{MultiValueIndex, SerializableMultivalueIndex}; +use crate::column_index::serialize::SerializableOptionalIndex; +use crate::column_index::SerializableColumnIndex; use crate::iterable::Iterable; use crate::{Cardinality, ColumnIndex, RowId, StackMergeOrder}; @@ -15,20 +17,146 @@ pub fn merge_column_index_stacked<'a>( ) -> SerializableColumnIndex<'a> { match cardinality_after_merge { Cardinality::Full => SerializableColumnIndex::Full, - Cardinality::Optional => SerializableColumnIndex::Optional { + Cardinality::Optional => SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids: Box::new(StackedOptionalIndex { columns, stack_merge_order, }), num_rows: stack_merge_order.num_rows(), - }, + }), Cardinality::Multivalued => { - let stacked_multivalued_index = StackedMultivaluedIndex { - columns, - stack_merge_order, - }; - SerializableColumnIndex::Multivalued(Box::new(stacked_multivalued_index)) + let serializable_multivalue_index = + make_serializable_multivalued_index(columns, stack_merge_order); + SerializableColumnIndex::Multivalued(serializable_multivalue_index) + } + } +} + +struct StackedDocIdsWithValues<'a> { + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +} + +impl Iterable for StackedDocIdsWithValues<'_> { + fn boxed_iter(&self) -> Box + '_> { + Box::new((0..self.column_indexes.len()).flat_map(|i| { + let column_index = &self.column_indexes[i]; + let doc_range = self.stack_merge_order.columnar_range(i); + get_doc_ids_with_values(column_index, doc_range) + })) + } +} + +fn get_doc_ids_with_values<'a>( + column_index: &'a ColumnIndex, + doc_range: Range, +) -> Box + 'a> { + match column_index { + ColumnIndex::Empty { .. } => Box::new(0..0), + ColumnIndex::Full => Box::new(doc_range), + ColumnIndex::Optional(optional_index) => Box::new( + optional_index + .iter_rows() + .map(move |row| row + doc_range.start), + ), + ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { + MultiValueIndex::MultiValueIndexV1(multivalued_index) => { + Box::new((0..multivalued_index.num_docs()).filter_map(move |docid| { + let range = multivalued_index.range(docid); + if range.is_empty() { + None + } else { + Some(docid + doc_range.start) + } + })) + } + MultiValueIndex::MultiValueIndexV2(multivalued_index) => Box::new( + multivalued_index + .optional_index + .iter_rows() + .map(move |row| row + doc_range.start), + ), + }, + } +} + +fn stack_doc_ids_with_values<'a>( + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> SerializableOptionalIndex<'a> { + let num_rows = stack_merge_order.num_rows(); + SerializableOptionalIndex { + non_null_row_ids: Box::new(StackedDocIdsWithValues { + column_indexes, + stack_merge_order, + }), + num_rows, + } +} + +struct StackedStartOffsets<'a> { + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +} + +fn get_num_values_iterator<'a>( + column_index: &'a ColumnIndex, + num_docs: u32, +) -> Box + 'a> { + match column_index { + ColumnIndex::Empty { .. } => Box::new(std::iter::empty()), + ColumnIndex::Full => Box::new(std::iter::repeat(1u32).take(num_docs as usize)), + ColumnIndex::Optional(optional_index) => { + Box::new(std::iter::repeat(1u32).take(optional_index.num_non_nulls() as usize)) } + ColumnIndex::Multivalued(multivalued_index) => Box::new( + multivalued_index + .get_start_index_column() + .iter() + .scan(0u32, |previous_start_offset, current_start_offset| { + let num_vals = current_start_offset - *previous_start_offset; + *previous_start_offset = current_start_offset; + Some(num_vals) + }) + .skip(1), + ), + } +} + +impl<'a> Iterable for StackedStartOffsets<'a> { + fn boxed_iter(&self) -> Box + '_> { + let num_values_it = (0..self.column_indexes.len()).flat_map(|columnar_id| { + let num_docs = self.stack_merge_order.columnar_range(columnar_id).len() as u32; + let column_index = &self.column_indexes[columnar_id]; + get_num_values_iterator(column_index, num_docs) + }); + Box::new(std::iter::once(0u32).chain(num_values_it.into_iter().scan( + 0u32, + |cumulated, el| { + *cumulated += el; + Some(*cumulated) + }, + ))) + } +} + +fn stack_start_offsets<'a>( + column_indexes: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> Box + 'a> { + Box::new(StackedStartOffsets { + column_indexes, + stack_merge_order, + }) +} + +fn make_serializable_multivalued_index<'a>( + columns: &'a [ColumnIndex], + stack_merge_order: &'a StackMergeOrder, +) -> SerializableMultivalueIndex<'a> { + SerializableMultivalueIndex { + doc_ids_with_values: stack_doc_ids_with_values(columns, stack_merge_order), + start_offsets: stack_start_offsets(columns, stack_merge_order), } } @@ -62,87 +190,3 @@ impl<'a> Iterable for StackedOptionalIndex<'a> { ) } } - -#[derive(Clone, Copy)] -struct StackedMultivaluedIndex<'a> { - columns: &'a [ColumnIndex], - stack_merge_order: &'a StackMergeOrder, -} - -fn convert_column_opt_to_multivalued_index<'a>( - column_index_opt: &'a ColumnIndex, - num_rows: RowId, -) -> Box + 'a> { - match column_index_opt { - ColumnIndex::Empty { .. } => Box::new(iter::repeat(0u32).take(num_rows as usize + 1)), - ColumnIndex::Full => Box::new(0..num_rows + 1), - ColumnIndex::Optional(optional_index) => { - Box::new( - (0..num_rows) - // TODO optimize - .map(|row_id| optional_index.rank(row_id)) - .chain(std::iter::once(optional_index.num_non_nulls())), - ) - } - ColumnIndex::Multivalued(multivalued_index) => multivalued_index.start_index_column.iter(), - } -} - -impl<'a> Iterable for StackedMultivaluedIndex<'a> { - fn boxed_iter(&self) -> Box + '_> { - let multivalued_indexes = - self.columns - .iter() - .enumerate() - .map(|(columnar_id, column_opt)| { - let num_rows = - self.stack_merge_order.columnar_range(columnar_id).len() as RowId; - convert_column_opt_to_multivalued_index(column_opt, num_rows) - }); - stack_multivalued_indexes(multivalued_indexes) - } -} - -// Refactor me -fn stack_multivalued_indexes<'a>( - mut multivalued_indexes: impl Iterator + 'a>> + 'a, -) -> Box + 'a> { - let mut offset = 0; - let mut last_row_id = 0; - let mut current_it = multivalued_indexes.next(); - Box::new(std::iter::from_fn(move || loop { - if let Some(row_id) = current_it.as_mut()?.next() { - last_row_id = offset + row_id; - return Some(last_row_id); - } - offset = last_row_id; - loop { - current_it = multivalued_indexes.next(); - if current_it.as_mut()?.next().is_some() { - break; - } - } - })) -} - -#[cfg(test)] -mod tests { - use crate::RowId; - - fn it<'a>(row_ids: &'a [RowId]) -> Box + 'a> { - Box::new(row_ids.iter().copied()) - } - - #[test] - fn test_stack() { - let columns = [ - it(&[0u32, 0u32]), - it(&[0u32, 1u32, 1u32, 4u32]), - it(&[0u32, 3u32, 5u32]), - it(&[0u32, 4u32]), - ] - .into_iter(); - let start_offsets: Vec = super::stack_multivalued_indexes(columns).collect(); - assert_eq!(start_offsets, &[0, 0, 1, 1, 4, 7, 9, 13]); - } -} diff --git a/columnar/src/column_index/mod.rs b/columnar/src/column_index/mod.rs index f52e26ff49..1edfa16b28 100644 --- a/columnar/src/column_index/mod.rs +++ b/columnar/src/column_index/mod.rs @@ -11,8 +11,11 @@ mod serialize; use std::ops::Range; pub use merge::merge_column_index; +pub(crate) use multivalued_index::SerializableMultivalueIndex; pub use optional_index::{OptionalIndex, Set}; -pub use serialize::{open_column_index, serialize_column_index, SerializableColumnIndex}; +pub use serialize::{ + open_column_index, serialize_column_index, SerializableColumnIndex, SerializableOptionalIndex, +}; use crate::column_index::multivalued_index::MultiValueIndex; use crate::{Cardinality, DocId, RowId}; @@ -131,15 +134,41 @@ impl ColumnIndex { let row_end = optional_index.rank(doc_id_range.end); row_start..row_end } - ColumnIndex::Multivalued(multivalued_index) => { - let end_docid = doc_id_range.end.min(multivalued_index.num_docs() - 1) + 1; - let start_docid = doc_id_range.start.min(end_docid); - - let row_start = multivalued_index.start_index_column.get_val(start_docid); - let row_end = multivalued_index.start_index_column.get_val(end_docid); - - row_start..row_end - } + ColumnIndex::Multivalued(multivalued_index) => match multivalued_index { + MultiValueIndex::MultiValueIndexV1(index) => { + let row_start = index.start_index_column.get_val(doc_id_range.start); + let row_end = index.start_index_column.get_val(doc_id_range.end); + row_start..row_end + } + MultiValueIndex::MultiValueIndexV2(index) => { + // In this case we will use the optional_index select the next values + // that are valid. There are different cases to consider: + // Not exists below means does not exist in the optional + // index, because it has no values. + // * doc_id_range may cover a range of docids which are non existent + // => rank + // will give us the next document outside the range with a value. They both + // get the same rank and therefore return a zero range + // + // * doc_id_range.start and doc_id_range.end may not exist, but docids in + // between may have values + // => rank will give us the next document outside the range with a value. + // + // * doc_id_range.start may be not existent but doc_id_range.end may exist + // * doc_id_range.start may exist but doc_id_range.end may not exist + // * doc_id_range.start and doc_id_range.end may exist + // => rank on doc_id_range.end will give use the next value, which matches + // how the `start_index_column` works, so we get the value start of the next + // docid which we use to create the exclusive range. + // + let rank_start = index.optional_index.rank(doc_id_range.start); + let row_start = index.start_index_column.get_val(rank_start); + let rank_end = index.optional_index.rank(doc_id_range.end); + let row_end = index.start_index_column.get_val(rank_end); + + row_start..row_end + } + }, } } diff --git a/columnar/src/column_index/multivalued_index.rs b/columnar/src/column_index/multivalued_index.rs index eab82a3e30..cef5a1221d 100644 --- a/columnar/src/column_index/multivalued_index.rs +++ b/columnar/src/column_index/multivalued_index.rs @@ -3,73 +3,257 @@ use std::io::Write; use std::ops::Range; use std::sync::Arc; -use common::OwnedBytes; +use common::{CountingWriter, OwnedBytes}; +use super::optional_index::{open_optional_index, serialize_optional_index}; +use super::{OptionalIndex, SerializableOptionalIndex, Set}; use crate::column_values::{ load_u64_based_column_values, serialize_u64_based_column_values, CodecType, ColumnValues, }; use crate::iterable::Iterable; -use crate::{DocId, RowId}; +use crate::{DocId, RowId, Version}; + +pub struct SerializableMultivalueIndex<'a> { + pub doc_ids_with_values: SerializableOptionalIndex<'a>, + pub start_offsets: Box + 'a>, +} pub fn serialize_multivalued_index( - multivalued_index: &dyn Iterable, + multivalued_index: &SerializableMultivalueIndex, output: &mut impl Write, ) -> io::Result<()> { + let SerializableMultivalueIndex { + doc_ids_with_values, + start_offsets, + } = multivalued_index; + let mut count_writer = CountingWriter::wrap(output); + let SerializableOptionalIndex { + non_null_row_ids, + num_rows, + } = doc_ids_with_values; + serialize_optional_index(&**non_null_row_ids, *num_rows, &mut count_writer)?; + let optional_len = count_writer.written_bytes() as u32; + let output = count_writer.finish(); serialize_u64_based_column_values( - multivalued_index, + &**start_offsets, &[CodecType::Bitpacked, CodecType::Linear], output, )?; + output.write_all(&optional_len.to_le_bytes())?; Ok(()) } -pub fn open_multivalued_index(bytes: OwnedBytes) -> io::Result { - let start_index_column: Arc> = load_u64_based_column_values(bytes)?; - Ok(MultiValueIndex { start_index_column }) +pub fn open_multivalued_index( + bytes: OwnedBytes, + format_version: Version, +) -> io::Result { + match format_version { + Version::V1 => { + let start_index_column: Arc> = + load_u64_based_column_values(bytes)?; + Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 { + start_index_column, + })) + } + Version::V2 => { + let (body_bytes, optional_index_len) = bytes.rsplit(4); + let optional_index_len = + u32::from_le_bytes(optional_index_len.as_slice().try_into().unwrap()); + let (optional_index_bytes, start_index_bytes) = + body_bytes.split(optional_index_len as usize); + let optional_index = open_optional_index(optional_index_bytes)?; + let start_index_column: Arc> = + load_u64_based_column_values(start_index_bytes)?; + Ok(MultiValueIndex::MultiValueIndexV2(MultiValueIndexV2 { + optional_index, + start_index_column, + })) + } + } +} + +#[derive(Clone)] +/// Index to resolve value range for given doc_id. +/// Starts at 0. +pub enum MultiValueIndex { + MultiValueIndexV1(MultiValueIndexV1), + MultiValueIndexV2(MultiValueIndexV2), +} + +#[derive(Clone)] +/// Index to resolve value range for given doc_id. +/// Starts at 0. +pub struct MultiValueIndexV1 { + pub start_index_column: Arc>, +} + +impl MultiValueIndexV1 { + /// Returns `[start, end)`, such that the values associated with + /// the given document are `start..end`. + #[inline] + pub(crate) fn range(&self, doc_id: DocId) -> Range { + if doc_id >= self.num_docs() { + return 0..0; + } + let start = self.start_index_column.get_val(doc_id); + let end = self.start_index_column.get_val(doc_id + 1); + start..end + } + + /// Returns the number of documents in the index. + #[inline] + pub fn num_docs(&self) -> u32 { + self.start_index_column.num_vals() - 1 + } + + /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of + /// docids. Positions are converted inplace to docids. + /// + /// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the + /// index. + /// + /// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically + /// increasing positions. + /// + /// TODO: Instead of a linear scan we can employ a exponential search into binary search to + /// match a docid to its value position. + pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { + if ranks.is_empty() { + return; + } + let mut cur_doc = docid_start; + let mut last_doc = None; + + assert!(self.start_index_column.get_val(docid_start) <= ranks[0]); + + let mut write_doc_pos = 0; + for i in 0..ranks.len() { + let pos = ranks[i]; + loop { + let end = self.start_index_column.get_val(cur_doc + 1); + if end > pos { + ranks[write_doc_pos] = cur_doc; + write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 }; + last_doc = Some(cur_doc); + break; + } + cur_doc += 1; + } + } + ranks.truncate(write_doc_pos); + } } #[derive(Clone)] /// Index to resolve value range for given doc_id. /// Starts at 0. -pub struct MultiValueIndex { +pub struct MultiValueIndexV2 { + pub optional_index: OptionalIndex, pub start_index_column: Arc>, } impl std::fmt::Debug for MultiValueIndex { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let index = match self { + MultiValueIndex::MultiValueIndexV1(idx) => &idx.start_index_column, + MultiValueIndex::MultiValueIndexV2(idx) => &idx.start_index_column, + }; f.debug_struct("MultiValuedIndex") - .field("num_rows", &self.start_index_column.num_vals()) + .field("num_rows", &index.num_vals()) .finish_non_exhaustive() } } -impl From>> for MultiValueIndex { - fn from(start_index_column: Arc>) -> Self { - MultiValueIndex { start_index_column } - } -} - impl MultiValueIndex { pub fn for_test(start_offsets: &[RowId]) -> MultiValueIndex { + assert!(!start_offsets.is_empty()); + assert_eq!(start_offsets[0], 0); + let mut doc_with_values = Vec::new(); + let mut compact_start_offsets: Vec = vec![0]; + for doc in 0..start_offsets.len() - 1 { + if start_offsets[doc] < start_offsets[doc + 1] { + doc_with_values.push(doc as RowId); + compact_start_offsets.push(start_offsets[doc + 1]); + } + } + let serializable_multivalued_index = SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids: Box::new(&doc_with_values[..]), + num_rows: start_offsets.len() as u32 - 1, + }, + start_offsets: Box::new(&compact_start_offsets[..]), + }; let mut buffer = Vec::new(); - serialize_multivalued_index(&start_offsets, &mut buffer).unwrap(); + serialize_multivalued_index(&serializable_multivalued_index, &mut buffer).unwrap(); let bytes = OwnedBytes::new(buffer); - open_multivalued_index(bytes).unwrap() + open_multivalued_index(bytes, Version::V2).unwrap() + } + + pub fn get_start_index_column(&self) -> &Arc> { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => &idx.start_index_column, + MultiValueIndex::MultiValueIndexV2(idx) => &idx.start_index_column, + } } + /// Returns `[start, end)` values range, such that the values associated with + /// the given document are `start..end`. + #[inline] + pub(crate) fn range(&self, doc_id: DocId) -> Range { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => idx.range(doc_id), + MultiValueIndex::MultiValueIndexV2(idx) => idx.range(doc_id), + } + } + + /// Returns the number of documents in the index. + #[inline] + pub fn num_docs(&self) -> u32 { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => idx.start_index_column.num_vals() - 1, + MultiValueIndex::MultiValueIndexV2(idx) => idx.optional_index.num_docs(), + } + } + + /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of + /// docids. Positions are converted inplace to docids. + /// + /// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the + /// index. + /// + /// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically + /// increasing positions. + /// + /// TODO: Instead of a linear scan we can employ a exponential search into binary search to + /// match a docid to its value position. + pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { + match self { + MultiValueIndex::MultiValueIndexV1(idx) => { + idx.select_batch_in_place(docid_start, ranks) + } + MultiValueIndex::MultiValueIndexV2(idx) => { + idx.select_batch_in_place(docid_start, ranks) + } + } + } +} +impl MultiValueIndexV2 { /// Returns `[start, end)`, such that the values associated with /// the given document are `start..end`. #[inline] pub(crate) fn range(&self, doc_id: DocId) -> Range { - let start = self.start_index_column.get_val(doc_id); - let end = self.start_index_column.get_val(doc_id + 1); + let Some(rank) = self.optional_index.rank_if_exists(doc_id) else { + return 0..0; + }; + let start = self.start_index_column.get_val(rank); + let end = self.start_index_column.get_val(rank + 1); start..end } /// Returns the number of documents in the index. #[inline] pub fn num_docs(&self) -> u32 { - self.start_index_column.num_vals() - 1 + self.optional_index.num_docs() } /// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of @@ -83,31 +267,38 @@ impl MultiValueIndex { /// /// TODO: Instead of a linear scan we can employ a exponential search into binary search to /// match a docid to its value position. - #[allow(clippy::bool_to_int_with_if)] pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec) { if ranks.is_empty() { return; } - let mut cur_doc = docid_start; + let mut cur_pos_in_idx = self.optional_index.rank(docid_start); let mut last_doc = None; - assert!(self.start_index_column.get_val(docid_start) <= ranks[0]); + assert!(cur_pos_in_idx <= ranks[0]); let mut write_doc_pos = 0; for i in 0..ranks.len() { let pos = ranks[i]; loop { - let end = self.start_index_column.get_val(cur_doc + 1); + let end = self.start_index_column.get_val(cur_pos_in_idx + 1); if end > pos { - ranks[write_doc_pos] = cur_doc; - write_doc_pos += if last_doc == Some(cur_doc) { 0 } else { 1 }; - last_doc = Some(cur_doc); + ranks[write_doc_pos] = cur_pos_in_idx; + write_doc_pos += if last_doc == Some(cur_pos_in_idx) { + 0 + } else { + 1 + }; + last_doc = Some(cur_pos_in_idx); break; } - cur_doc += 1; + cur_pos_in_idx += 1; } } ranks.truncate(write_doc_pos); + + for rank in ranks.iter_mut() { + *rank = self.optional_index.select(*rank); + } } } @@ -116,6 +307,7 @@ mod tests { use std::ops::Range; use super::MultiValueIndex; + use crate::{ColumnarReader, DynamicColumn}; fn index_to_pos_helper( index: &MultiValueIndex, @@ -134,6 +326,7 @@ mod tests { let positions = &[10u32, 11, 15, 20, 21, 22]; assert_eq!(index_to_pos_helper(&index, 0..5, positions), vec![1, 3, 4]); assert_eq!(index_to_pos_helper(&index, 1..5, positions), vec![1, 3, 4]); + assert_eq!(index_to_pos_helper(&index, 0..5, &[9]), vec![0]); assert_eq!(index_to_pos_helper(&index, 1..5, &[10]), vec![1]); assert_eq!(index_to_pos_helper(&index, 1..5, &[11]), vec![1]); @@ -141,4 +334,67 @@ mod tests { assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14]), vec![2]); assert_eq!(index_to_pos_helper(&index, 2..5, &[12, 14, 15]), vec![2, 3]); } + + #[test] + fn test_range_to_rowids() { + use crate::ColumnarWriter; + + let mut columnar_writer = ColumnarWriter::default(); + + // This column gets coerced to u64 + columnar_writer.record_numerical(1, "full", u64::MAX); + columnar_writer.record_numerical(1, "full", u64::MAX); + + columnar_writer.record_numerical(5, "full", u64::MAX); + columnar_writer.record_numerical(5, "full", u64::MAX); + + let mut wrt: Vec = Vec::new(); + columnar_writer.serialize(7, &mut wrt).unwrap(); + + let reader = ColumnarReader::open(wrt).unwrap(); + // Open the column as u64 + let column = reader.read_columns("full").unwrap()[0] + .open() + .unwrap() + .coerce_numerical(crate::NumericalType::U64) + .unwrap(); + let DynamicColumn::U64(column) = column else { + panic!(); + }; + + let row_id_range = column.index.docid_range_to_rowids(1..2); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(0..2); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(0..4); + assert_eq!(row_id_range, 0..2); + + let row_id_range = column.index.docid_range_to_rowids(3..4); + assert_eq!(row_id_range, 2..2); + + let row_id_range = column.index.docid_range_to_rowids(1..6); + assert_eq!(row_id_range, 0..4); + + let row_id_range = column.index.docid_range_to_rowids(3..6); + assert_eq!(row_id_range, 2..4); + + let row_id_range = column.index.docid_range_to_rowids(0..6); + assert_eq!(row_id_range, 0..4); + + let row_id_range = column.index.docid_range_to_rowids(0..6); + assert_eq!(row_id_range, 0..4); + + let check = |range, expected| { + let full_range = 0..=u64::MAX; + let mut docids = Vec::new(); + column.get_docids_for_value_range(full_range, range, &mut docids); + assert_eq!(docids, expected); + }; + + // check(0..1, vec![]); + // check(0..2, vec![1]); + check(1..2, vec![1]); + } } diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index bd12ade194..a615f78a0d 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -86,8 +86,14 @@ pub struct OptionalIndex { block_metas: Arc<[BlockMeta]>, } +impl<'a> Iterable for &'a OptionalIndex { + fn boxed_iter(&self) -> Box + '_> { + Box::new(self.iter_rows()) + } +} + impl std::fmt::Debug for OptionalIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("OptionalIndex") .field("num_rows", &self.num_rows) .field("num_non_null_rows", &self.num_non_null_rows) diff --git a/columnar/src/column_index/optional_index/set.rs b/columnar/src/column_index/optional_index/set.rs index b2bf9cbe2c..25f90596f4 100644 --- a/columnar/src/column_index/optional_index/set.rs +++ b/columnar/src/column_index/optional_index/set.rs @@ -28,10 +28,11 @@ pub trait Set { /// Returns true if the elements is contained in the Set fn contains(&self, el: T) -> bool; - /// Returns the number of rows in the set that are < `el` + /// Returns the element's rank (its position in the set). + /// If the set does not contain the element, it will return the next existing elements rank. fn rank(&self, el: T) -> T; - /// If the set contains `el` returns the element rank. + /// If the set contains `el`, returns the element's rank (its position in the set). /// If the set does not contain the element, it returns `None`. fn rank_if_exists(&self, el: T) -> Option; diff --git a/columnar/src/column_index/optional_index/set_block/tests.rs b/columnar/src/column_index/optional_index/set_block/tests.rs index 8957ada25a..b6fe5ba4db 100644 --- a/columnar/src/column_index/optional_index/set_block/tests.rs +++ b/columnar/src/column_index/optional_index/set_block/tests.rs @@ -22,8 +22,8 @@ fn test_set_helper>(vals: &[u16]) -> usize { vals.iter().cloned().take_while(|v| *v < val).count() as u16 ); } - for rank in 0..vals.len() { - assert_eq!(tested_set.select(rank as u16), vals[rank]); + for (rank, val) in vals.iter().enumerate() { + assert_eq!(tested_set.select(rank as u16), *val); } buffer.len() } @@ -107,3 +107,41 @@ fn test_simple_translate_codec_idx_to_original_idx_dense() { assert_eq!(i, select_cursor.select(i)); } } + +#[test] +fn test_simple_translate_idx_to_value_idx_dense() { + let mut buffer = Vec::new(); + DenseBlockCodec::serialize([1, 10].iter().copied(), &mut buffer).unwrap(); + let tested_set = DenseBlockCodec::open(buffer.as_slice()); + assert!(tested_set.contains(1)); + assert!(!tested_set.contains(2)); + assert_eq!(tested_set.rank(0), 0); + assert_eq!(tested_set.rank(1), 0); + for rank in 2..10 { + // ranks that don't exist select the next highest one + assert_eq!(tested_set.rank_if_exists(rank), None); + assert_eq!(tested_set.rank(rank), 1); + } + assert_eq!(tested_set.rank(10), 1); +} + +#[test] +fn test_simple_translate_idx_to_value_idx_sparse() { + let mut buffer = Vec::new(); + SparseBlockCodec::serialize([1, 10].iter().copied(), &mut buffer).unwrap(); + let tested_set = SparseBlockCodec::open(buffer.as_slice()); + assert!(tested_set.contains(1)); + assert!(!tested_set.contains(2)); + assert_eq!(tested_set.rank(0), 0); + assert_eq!(tested_set.select(tested_set.rank(0)), 1); + assert_eq!(tested_set.rank(1), 0); + assert_eq!(tested_set.select(tested_set.rank(1)), 1); + for rank in 2..10 { + // ranks that don't exist select the next highest one + assert_eq!(tested_set.rank_if_exists(rank), None); + assert_eq!(tested_set.rank(rank), 1); + assert_eq!(tested_set.select(tested_set.rank(rank)), 10); + } + assert_eq!(tested_set.rank(10), 1); + assert_eq!(tested_set.select(tested_set.rank(10)), 10); +} diff --git a/columnar/src/column_index/serialize.rs b/columnar/src/column_index/serialize.rs index f2a99c740c..107faf0dd2 100644 --- a/columnar/src/column_index/serialize.rs +++ b/columnar/src/column_index/serialize.rs @@ -3,28 +3,39 @@ use std::io::Write; use common::{CountingWriter, OwnedBytes}; +use super::multivalued_index::SerializableMultivalueIndex; +use super::OptionalIndex; use crate::column_index::multivalued_index::serialize_multivalued_index; use crate::column_index::optional_index::serialize_optional_index; use crate::column_index::ColumnIndex; use crate::iterable::Iterable; -use crate::{Cardinality, RowId}; +use crate::{Cardinality, RowId, Version}; + +pub struct SerializableOptionalIndex<'a> { + pub non_null_row_ids: Box + 'a>, + pub num_rows: RowId, +} + +impl<'a> From<&'a OptionalIndex> for SerializableOptionalIndex<'a> { + fn from(optional_index: &'a OptionalIndex) -> Self { + SerializableOptionalIndex { + non_null_row_ids: Box::new(optional_index), + num_rows: optional_index.num_docs(), + } + } +} pub enum SerializableColumnIndex<'a> { Full, - Optional { - non_null_row_ids: Box + 'a>, - num_rows: RowId, - }, - // TODO remove the Arc apart from serialization this is not - // dynamic at all. - Multivalued(Box + 'a>), + Optional(SerializableOptionalIndex<'a>), + Multivalued(SerializableMultivalueIndex<'a>), } impl<'a> SerializableColumnIndex<'a> { pub fn get_cardinality(&self) -> Cardinality { match self { SerializableColumnIndex::Full => Cardinality::Full, - SerializableColumnIndex::Optional { .. } => Cardinality::Optional, + SerializableColumnIndex::Optional(_) => Cardinality::Optional, SerializableColumnIndex::Multivalued(_) => Cardinality::Multivalued, } } @@ -40,12 +51,12 @@ pub fn serialize_column_index( output.write_all(&[cardinality])?; match column_index { SerializableColumnIndex::Full => {} - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids, num_rows, - } => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, + }) => serialize_optional_index(non_null_row_ids.as_ref(), num_rows, &mut output)?, SerializableColumnIndex::Multivalued(multivalued_index) => { - serialize_multivalued_index(&*multivalued_index, &mut output)? + serialize_multivalued_index(&multivalued_index, &mut output)? } } let column_index_num_bytes = output.written_bytes() as u32; @@ -53,7 +64,10 @@ pub fn serialize_column_index( } /// Open a serialized column index. -pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result { +pub fn open_column_index( + mut bytes: OwnedBytes, + format_version: Version, +) -> io::Result { if bytes.is_empty() { return Err(io::Error::new( io::ErrorKind::UnexpectedEof, @@ -70,7 +84,8 @@ pub fn open_column_index(mut bytes: OwnedBytes) -> io::Result { Ok(ColumnIndex::Optional(optional_index)) } Cardinality::Multivalued => { - let multivalue_index = super::multivalued_index::open_multivalued_index(bytes)?; + let multivalue_index = + super::multivalued_index::open_multivalued_index(bytes, format_version)?; Ok(ColumnIndex::Multivalued(multivalue_index)) } } diff --git a/columnar/src/columnar/format_version.rs b/columnar/src/columnar/format_version.rs index a46913be84..1314fb7540 100644 --- a/columnar/src/columnar/format_version.rs +++ b/columnar/src/columnar/format_version.rs @@ -11,7 +11,7 @@ const MAGIC_BYTES: [u8; 4] = [2, 113, 119, 66]; pub fn footer() -> [u8; VERSION_FOOTER_NUM_BYTES] { let mut footer_bytes = [0u8; VERSION_FOOTER_NUM_BYTES]; - footer_bytes[0..4].copy_from_slice(&Version::V1.to_bytes()); + footer_bytes[0..4].copy_from_slice(&CURRENT_VERSION.to_bytes()); footer_bytes[4..8].copy_from_slice(&MAGIC_BYTES[..]); footer_bytes } @@ -23,18 +23,20 @@ pub fn parse_footer(footer_bytes: [u8; VERSION_FOOTER_NUM_BYTES]) -> Result fmt::Result { match self { Version::V1 => write!(f, "v1"), + Version::V2 => write!(f, "v2"), } } } @@ -48,6 +50,7 @@ impl Version { let code = u32::from_le_bytes(bytes); match code { 1u32 => Ok(Version::V1), + 2u32 => Ok(Version::V2), _ => Err(InvalidData), } } @@ -60,9 +63,9 @@ mod tests { use super::*; #[test] - fn test_footer_dserialization() { + fn test_footer_deserialization() { let parsed_version: Version = parse_footer(footer()).unwrap(); - assert_eq!(Version::V1, parsed_version); + assert_eq!(Version::V2, parsed_version); } #[test] @@ -76,11 +79,10 @@ mod tests { for &i in &version_to_tests { let version_res = Version::try_from_bytes(i.to_le_bytes()); if let Ok(version) = version_res { - assert_eq!(version, Version::V1); assert_eq!(version.to_bytes(), i.to_le_bytes()); valid_versions.insert(i); } } - assert_eq!(valid_versions.len(), 1); + assert_eq!(valid_versions.len(), 2); } } diff --git a/columnar/src/columnar/merge/merge_mapping.rs b/columnar/src/columnar/merge/merge_mapping.rs index 078ed44bb9..8428861821 100644 --- a/columnar/src/columnar/merge/merge_mapping.rs +++ b/columnar/src/columnar/merge/merge_mapping.rs @@ -59,7 +59,6 @@ pub enum MergeRowOrder { Stack(StackMergeOrder), /// Some more complex mapping, that may interleaves rows from the different readers and /// drop rows, or do both. - /// TODO: remove ordering part here Shuffled(ShuffleMergeOrder), } diff --git a/columnar/src/columnar/merge/mod.rs b/columnar/src/columnar/merge/mod.rs index 9f7666e8fe..d970d68615 100644 --- a/columnar/src/columnar/merge/mod.rs +++ b/columnar/src/columnar/merge/mod.rs @@ -7,7 +7,6 @@ use std::io; use std::net::Ipv6Addr; use std::sync::Arc; -use itertools::Itertools; pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder}; use super::writer::ColumnarSerializer; @@ -371,20 +370,8 @@ fn is_empty_after_merge( true } ColumnIndex::Multivalued(multivalued_index) => { - for (doc_id, (start_index, end_index)) in multivalued_index - .start_index_column - .iter() - .tuple_windows() - .enumerate() - { - let doc_id = doc_id as u32; - if start_index == end_index { - // There are no values in this document - continue; - } - // The document contains values and is present in the alive bitset. - // The column is therefore not empty. - if alive_bitset.contains(doc_id) { + for alive_docid in alive_bitset.iter() { + if !multivalued_index.range(alive_docid).is_empty() { return false; } } diff --git a/columnar/src/columnar/merge/tests.rs b/columnar/src/columnar/merge/tests.rs index 697fe3d246..1419bf2c98 100644 --- a/columnar/src/columnar/merge/tests.rs +++ b/columnar/src/columnar/merge/tests.rs @@ -1,3 +1,5 @@ +use itertools::Itertools; + use super::*; use crate::{Cardinality, ColumnarWriter, HasAssociatedColumnType, RowId}; diff --git a/columnar/src/columnar/reader/mod.rs b/columnar/src/columnar/reader/mod.rs index 23af3f0eea..f850e4f6f6 100644 --- a/columnar/src/columnar/reader/mod.rs +++ b/columnar/src/columnar/reader/mod.rs @@ -6,7 +6,7 @@ use sstable::{Dictionary, RangeSSTable}; use crate::columnar::{format_version, ColumnType}; use crate::dynamic_column::DynamicColumnHandle; -use crate::RowId; +use crate::{RowId, Version}; fn io_invalid_data(msg: String) -> io::Error { io::Error::new(io::ErrorKind::InvalidData, msg) @@ -19,6 +19,7 @@ pub struct ColumnarReader { column_dictionary: Dictionary, column_data: FileSlice, num_rows: RowId, + format_version: Version, } impl fmt::Debug for ColumnarReader { @@ -53,6 +54,7 @@ impl fmt::Debug for ColumnarReader { fn read_all_columns_in_stream( mut stream: sstable::Streamer<'_, RangeSSTable>, column_data: &FileSlice, + format_version: Version, ) -> io::Result> { let mut results = Vec::new(); while stream.advance() { @@ -67,6 +69,7 @@ fn read_all_columns_in_stream( let dynamic_column_handle = DynamicColumnHandle { file_slice, column_type, + format_version, }; results.push(dynamic_column_handle); } @@ -88,7 +91,7 @@ impl ColumnarReader { let num_rows = u32::deserialize(&mut &footer_bytes[8..12])?; let version_footer_bytes: [u8; format_version::VERSION_FOOTER_NUM_BYTES] = footer_bytes[12..].try_into().unwrap(); - let _version = format_version::parse_footer(version_footer_bytes)?; + let format_version = format_version::parse_footer(version_footer_bytes)?; let (column_data, sstable) = file_slice_without_sstable_len.split_from_end(sstable_len as usize); let column_dictionary = Dictionary::open(sstable)?; @@ -96,6 +99,7 @@ impl ColumnarReader { column_dictionary, column_data, num_rows, + format_version, }) } @@ -126,6 +130,7 @@ impl ColumnarReader { let column_handle = DynamicColumnHandle { file_slice, column_type, + format_version: self.format_version, }; Some((column_name, column_handle)) } else { @@ -167,7 +172,7 @@ impl ColumnarReader { .stream_for_column_range(column_name) .into_stream_async() .await?; - read_all_columns_in_stream(stream, &self.column_data) + read_all_columns_in_stream(stream, &self.column_data, self.format_version) } /// Get all columns for the given column name. @@ -176,7 +181,7 @@ impl ColumnarReader { /// different types. pub fn read_columns(&self, column_name: &str) -> io::Result> { let stream = self.stream_for_column_range(column_name).into_stream()?; - read_all_columns_in_stream(stream, &self.column_data) + read_all_columns_in_stream(stream, &self.column_data, self.format_version) } /// Return the number of columns in the columnar. diff --git a/columnar/src/columnar/writer/mod.rs b/columnar/src/columnar/writer/mod.rs index d5fda430c7..239a7422d6 100644 --- a/columnar/src/columnar/writer/mod.rs +++ b/columnar/src/columnar/writer/mod.rs @@ -12,7 +12,7 @@ use common::CountingWriter; pub(crate) use serializer::ColumnarSerializer; use stacker::{Addr, ArenaHashMap, MemoryArena}; -use crate::column_index::SerializableColumnIndex; +use crate::column_index::{SerializableColumnIndex, SerializableOptionalIndex}; use crate::column_values::{MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use crate::columnar::column_type::ColumnType; use crate::columnar::writer::column_writers::{ @@ -554,16 +554,16 @@ fn send_to_serialize_column_mappable_to_u128< let optional_index_builder = value_index_builders.borrow_optional_index_builder(); consume_operation_iterator(op_iterator, optional_index_builder, values); let optional_index = optional_index_builder.finish(num_rows); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { num_rows, non_null_row_ids: Box::new(optional_index), - } + }) } Cardinality::Multivalued => { let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); - let multivalued_index = multivalued_index_builder.finish(num_rows); - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + let serializable_multivalued_index = multivalued_index_builder.finish(num_rows); + SerializableColumnIndex::Multivalued(serializable_multivalued_index) } }; crate::column::serialize_column_mappable_to_u128( @@ -574,15 +574,6 @@ fn send_to_serialize_column_mappable_to_u128< Ok(()) } -fn sort_values_within_row_in_place(multivalued_index: &[RowId], values: &mut [u64]) { - let mut start_index: usize = 0; - for end_index in multivalued_index.iter().copied() { - let end_index = end_index as usize; - values[start_index..end_index].sort_unstable(); - start_index = end_index; - } -} - fn send_to_serialize_column_mappable_to_u64( op_iterator: impl Iterator>, cardinality: Cardinality, @@ -606,19 +597,22 @@ fn send_to_serialize_column_mappable_to_u64( let optional_index_builder = value_index_builders.borrow_optional_index_builder(); consume_operation_iterator(op_iterator, optional_index_builder, values); let optional_index = optional_index_builder.finish(num_rows); - SerializableColumnIndex::Optional { + SerializableColumnIndex::Optional(SerializableOptionalIndex { non_null_row_ids: Box::new(optional_index), num_rows, - } + }) } Cardinality::Multivalued => { let multivalued_index_builder = value_index_builders.borrow_multivalued_index_builder(); consume_operation_iterator(op_iterator, multivalued_index_builder, values); - let multivalued_index = multivalued_index_builder.finish(num_rows); + let serializable_multivalued_index = multivalued_index_builder.finish(num_rows); if sort_values_within_row { - sort_values_within_row_in_place(multivalued_index, values); + sort_values_within_row_in_place( + serializable_multivalued_index.start_offsets.boxed_iter(), + values, + ); } - SerializableColumnIndex::Multivalued(Box::new(multivalued_index)) + SerializableColumnIndex::Multivalued(serializable_multivalued_index) } }; crate::column::serialize_column_mappable_to_u64( @@ -629,6 +623,18 @@ fn send_to_serialize_column_mappable_to_u64( Ok(()) } +fn sort_values_within_row_in_place( + multivalued_index: impl Iterator, + values: &mut [u64], +) { + let mut start_index: usize = 0; + for end_index in multivalued_index { + let end_index = end_index as usize; + values[start_index..end_index].sort_unstable(); + start_index = end_index; + } +} + fn coerce_numerical_symbol( operation_iterator: impl Iterator>, ) -> impl Iterator> diff --git a/columnar/src/columnar/writer/value_index.rs b/columnar/src/columnar/writer/value_index.rs index ab57a7a7ff..a35432e3a4 100644 --- a/columnar/src/columnar/writer/value_index.rs +++ b/columnar/src/columnar/writer/value_index.rs @@ -1,3 +1,4 @@ +use crate::column_index::{SerializableMultivalueIndex, SerializableOptionalIndex}; use crate::iterable::Iterable; use crate::RowId; @@ -59,31 +60,47 @@ impl IndexBuilder for OptionalIndexBuilder { #[derive(Default)] pub struct MultivaluedIndexBuilder { - start_offsets: Vec, + doc_with_values: Vec, + start_offsets: Vec, total_num_vals_seen: u32, + current_row: RowId, + current_row_has_value: bool, } impl MultivaluedIndexBuilder { - pub fn finish(&mut self, num_docs: RowId) -> &[u32] { - self.start_offsets - .resize(num_docs as usize + 1, self.total_num_vals_seen); - &self.start_offsets[..] + pub fn finish(&mut self, num_docs: RowId) -> SerializableMultivalueIndex<'_> { + self.start_offsets.push(self.total_num_vals_seen); + let non_null_row_ids: Box> = Box::new(&self.doc_with_values[..]); + SerializableMultivalueIndex { + doc_ids_with_values: SerializableOptionalIndex { + non_null_row_ids, + num_rows: num_docs, + }, + start_offsets: Box::new(&self.start_offsets[..]), + } } fn reset(&mut self) { + self.doc_with_values.clear(); self.start_offsets.clear(); - self.start_offsets.push(0u32); self.total_num_vals_seen = 0; + self.current_row = 0; + self.current_row_has_value = false; } } impl IndexBuilder for MultivaluedIndexBuilder { fn record_row(&mut self, row_id: RowId) { - self.start_offsets - .resize(row_id as usize + 1, self.total_num_vals_seen); + self.current_row = row_id; + self.current_row_has_value = false; } fn record_value(&mut self) { + if !self.current_row_has_value { + self.current_row_has_value = true; + self.doc_with_values.push(self.current_row); + self.start_offsets.push(self.total_num_vals_seen); + } self.total_num_vals_seen += 1; } } @@ -141,6 +158,32 @@ mod tests { ); } + #[test] + fn test_multivalued_value_index_builder_simple() { + let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); + { + multivalued_value_index_builder.record_row(0u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32); + let start_offsets: Vec = serialized_multivalue_index + .start_offsets + .boxed_iter() + .collect(); + assert_eq!(&start_offsets, &[0, 2]); + } + multivalued_value_index_builder.reset(); + multivalued_value_index_builder.record_row(0u32); + multivalued_value_index_builder.record_value(); + multivalued_value_index_builder.record_value(); + let serialized_multivalue_index = multivalued_value_index_builder.finish(1u32); + let start_offsets: Vec = serialized_multivalue_index + .start_offsets + .boxed_iter() + .collect(); + assert_eq!(&start_offsets, &[0, 2]); + } + #[test] fn test_multivalued_value_index_builder() { let mut multivalued_value_index_builder = MultivaluedIndexBuilder::default(); @@ -149,17 +192,15 @@ mod tests { multivalued_value_index_builder.record_value(); multivalued_value_index_builder.record_row(2u32); multivalued_value_index_builder.record_value(); - assert_eq!( - multivalued_value_index_builder.finish(4u32).to_vec(), - vec![0, 0, 2, 3, 3] - ); - multivalued_value_index_builder.reset(); - multivalued_value_index_builder.record_row(2u32); - multivalued_value_index_builder.record_value(); - multivalued_value_index_builder.record_value(); - assert_eq!( - multivalued_value_index_builder.finish(4u32).to_vec(), - vec![0, 0, 0, 2, 2] - ); + let SerializableMultivalueIndex { + doc_ids_with_values, + start_offsets, + } = multivalued_value_index_builder.finish(4u32); + assert_eq!(doc_ids_with_values.num_rows, 4u32); + let doc_ids_with_values: Vec = + doc_ids_with_values.non_null_row_ids.boxed_iter().collect(); + assert_eq!(&doc_ids_with_values, &[1u32, 2u32]); + let start_offsets: Vec = start_offsets.boxed_iter().collect(); + assert_eq!(&start_offsets[..], &[0, 2, 3]); } } diff --git a/columnar/src/compat_tests.rs b/columnar/src/compat_tests.rs index cbb11333ec..8a504ab266 100644 --- a/columnar/src/compat_tests.rs +++ b/columnar/src/compat_tests.rs @@ -1,24 +1,29 @@ use std::path::PathBuf; -use crate::{Column, ColumnarReader, DynamicColumn, CURRENT_VERSION}; +use itertools::Itertools; + +use crate::{ + merge_columnar, Cardinality, Column, ColumnarReader, DynamicColumn, StackMergeOrder, + CURRENT_VERSION, +}; const NUM_DOCS: u32 = u16::MAX as u32; -fn generate_columnar(num_docs: u32) -> Vec { +fn generate_columnar(num_docs: u32, value_offset: u64) -> Vec { use crate::ColumnarWriter; let mut columnar_writer = ColumnarWriter::default(); for i in 0..num_docs { if i % 100 == 0 { - columnar_writer.record_numerical(i, "sparse", i as u64); + columnar_writer.record_numerical(i, "sparse", value_offset + i as u64); } - if i % 2 == 0 { - columnar_writer.record_numerical(i, "dense", i as u64); + if i % 5 == 0 { + columnar_writer.record_numerical(i, "dense", value_offset + i as u64); } - columnar_writer.record_numerical(i, "full", i as u64); - columnar_writer.record_numerical(i, "multi", i as u64); - columnar_writer.record_numerical(i, "multi", i as u64); + columnar_writer.record_numerical(i, "full", value_offset + i as u64); + columnar_writer.record_numerical(i, "multi", value_offset + i as u64); + columnar_writer.record_numerical(i, "multi", value_offset + i as u64); } let mut wrt: Vec = Vec::new(); @@ -35,7 +40,7 @@ fn create_format() { if PathBuf::from(file_path.clone()).exists() { return; } - let columnar = generate_columnar(NUM_DOCS); + let columnar = generate_columnar(NUM_DOCS, 0); std::fs::write(file_path, columnar).unwrap(); } @@ -49,27 +54,120 @@ fn test_format_v1() { test_format(&path); } +#[test] +fn test_format_v2() { + let path = path_for_version("v2"); + test_format(&path); +} + fn test_format(path: &str) { let file_content = std::fs::read(path).unwrap(); let reader = ColumnarReader::open(file_content).unwrap(); - let column = open_column(&reader, "full"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); + check_columns(&reader); + + // Test merge + let reader2 = ColumnarReader::open(generate_columnar(NUM_DOCS, NUM_DOCS as u64)).unwrap(); + let columnar_readers = vec![&reader, &reader2]; + let merge_row_order = StackMergeOrder::stack(&columnar_readers[..]); + let mut out = Vec::new(); + merge_columnar(&columnar_readers, &[], merge_row_order.into(), &mut out).unwrap(); + let reader = ColumnarReader::open(out).unwrap(); + check_columns(&reader); +} - let column = open_column(&reader, "multi"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); +fn check_columns(reader: &ColumnarReader) { + let column = open_column(reader, "full"); + check_column(&column, |doc_id| vec![(doc_id, doc_id as u64).into()]); + assert_eq!(column.get_cardinality(), Cardinality::Full); + + let column = open_column(reader, "multi"); + check_column(&column, |doc_id| { + vec![ + (doc_id * 2, doc_id as u64).into(), + (doc_id * 2 + 1, doc_id as u64).into(), + ] + }); + assert_eq!(column.get_cardinality(), Cardinality::Multivalued); + + let column = open_column(reader, "sparse"); + check_column(&column, |doc_id| { + if doc_id % 100 == 0 { + vec![(doc_id / 100, doc_id as u64).into()] + } else { + vec![] + } + }); + assert_eq!(column.get_cardinality(), Cardinality::Optional); + + let column = open_column(reader, "dense"); + check_column(&column, |doc_id| { + if doc_id % 5 == 0 { + vec![(doc_id / 5, doc_id as u64).into()] + } else { + vec![] + } + }); + assert_eq!(column.get_cardinality(), Cardinality::Optional); +} - let column = open_column(&reader, "sparse"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1), None); - assert_eq!(column.first(65000), Some(65000)); +struct RowIdAndValue { + row_id: u32, + value: u64, +} +impl From<(u32, u64)> for RowIdAndValue { + fn from((row_id, value): (u32, u64)) -> Self { + Self { row_id, value } + } +} - let column = open_column(&reader, "dense"); - assert_eq!(column.first(0).unwrap(), 0); - assert_eq!(column.first(NUM_DOCS - 1).unwrap(), NUM_DOCS as u64 - 1); - assert_eq!(column.first(NUM_DOCS - 2), None); +fn check_column Vec>(column: &Column, expected: F) { + let num_docs = column.num_docs(); + let test_doc = |doc: u32| { + if expected(doc).is_empty() { + assert_eq!(column.first(doc), None); + } else { + assert_eq!(column.first(doc), Some(expected(doc)[0].value)); + } + let values = column.values_for_doc(doc).collect_vec(); + assert_eq!(values, expected(doc).iter().map(|x| x.value).collect_vec()); + let mut row_ids = Vec::new(); + column.row_ids_for_docs(&[doc], &mut vec![], &mut row_ids); + assert_eq!( + row_ids, + expected(doc).iter().map(|x| x.row_id).collect_vec() + ); + let values = column.values_for_doc(doc).collect_vec(); + assert_eq!(values, expected(doc).iter().map(|x| x.value).collect_vec()); + + // Docid rowid conversion + let mut row_ids = Vec::new(); + let safe_next_doc = |doc: u32| (doc + 1).min(num_docs - 1); + column + .index + .docids_to_rowids(&[doc, safe_next_doc(doc)], &mut vec![], &mut row_ids); + let expected_rowids = expected(doc) + .iter() + .map(|x| x.row_id) + .chain(expected(safe_next_doc(doc)).iter().map(|x| x.row_id)) + .collect_vec(); + assert_eq!(row_ids, expected_rowids); + let rowid_range = column + .index + .docid_range_to_rowids(doc..safe_next_doc(doc) + 1); + if expected_rowids.is_empty() { + assert!(rowid_range.is_empty()); + } else { + assert_eq!( + rowid_range, + expected_rowids[0]..expected_rowids.last().unwrap() + 1 + ); + } + }; + test_doc(0); + test_doc(num_docs - 1); + test_doc(num_docs - 2); + test_doc(65000); } fn open_column(reader: &ColumnarReader, name: &str) -> Column { diff --git a/columnar/src/dynamic_column.rs b/columnar/src/dynamic_column.rs index 0a18d42074..2b9d69770c 100644 --- a/columnar/src/dynamic_column.rs +++ b/columnar/src/dynamic_column.rs @@ -8,7 +8,7 @@ use common::{ByteCount, DateTime, HasLen, OwnedBytes}; use crate::column::{BytesColumn, Column, StrColumn}; use crate::column_values::{monotonic_map_column, StrictlyMonotonicFn}; use crate::columnar::ColumnType; -use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType}; +use crate::{Cardinality, ColumnIndex, ColumnValues, NumericalType, Version}; #[derive(Clone)] pub enum DynamicColumn { @@ -232,6 +232,7 @@ static_dynamic_conversions!(Column, IpAddr); pub struct DynamicColumnHandle { pub(crate) file_slice: FileSlice, pub(crate) column_type: ColumnType, + pub(crate) format_version: Version, } impl DynamicColumnHandle { @@ -260,11 +261,15 @@ impl DynamicColumnHandle { let column_bytes = self.file_slice.read_bytes()?; match self.column_type { ColumnType::Str | ColumnType::Bytes => { - let column: BytesColumn = crate::column::open_column_bytes(column_bytes)?; + let column: BytesColumn = + crate::column::open_column_bytes(column_bytes, self.format_version)?; Ok(Some(column.term_ord_column)) } ColumnType::IpAddr => { - let column = crate::column::open_column_u128_as_compact_u64(column_bytes)?; + let column = crate::column::open_column_u128_as_compact_u64( + column_bytes, + self.format_version, + )?; Ok(Some(column)) } ColumnType::Bool @@ -272,7 +277,8 @@ impl DynamicColumnHandle { | ColumnType::U64 | ColumnType::F64 | ColumnType::DateTime => { - let column = crate::column::open_column_u64::(column_bytes)?; + let column = + crate::column::open_column_u64::(column_bytes, self.format_version)?; Ok(Some(column)) } } @@ -280,15 +286,31 @@ impl DynamicColumnHandle { fn open_internal(&self, column_bytes: OwnedBytes) -> io::Result { let dynamic_column: DynamicColumn = match self.column_type { - ColumnType::Bytes => crate::column::open_column_bytes(column_bytes)?.into(), - ColumnType::Str => crate::column::open_column_str(column_bytes)?.into(), - ColumnType::I64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::U64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::F64 => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::Bool => crate::column::open_column_u64::(column_bytes)?.into(), - ColumnType::IpAddr => crate::column::open_column_u128::(column_bytes)?.into(), + ColumnType::Bytes => { + crate::column::open_column_bytes(column_bytes, self.format_version)?.into() + } + ColumnType::Str => { + crate::column::open_column_str(column_bytes, self.format_version)?.into() + } + ColumnType::I64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::U64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::F64 => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::Bool => { + crate::column::open_column_u64::(column_bytes, self.format_version)?.into() + } + ColumnType::IpAddr => { + crate::column::open_column_u128::(column_bytes, self.format_version)? + .into() + } ColumnType::DateTime => { - crate::column::open_column_u64::(column_bytes)?.into() + crate::column::open_column_u64::(column_bytes, self.format_version)? + .into() } }; Ok(dynamic_column) diff --git a/columnar/src/iterable.rs b/columnar/src/iterable.rs index ec9c88665d..f59d37325b 100644 --- a/columnar/src/iterable.rs +++ b/columnar/src/iterable.rs @@ -1,4 +1,7 @@ use std::ops::Range; +use std::sync::Arc; + +use crate::{ColumnValues, RowId}; pub trait Iterable { fn boxed_iter(&self) -> Box + '_>; @@ -17,3 +20,9 @@ where Range: Iterator Box::new(self.clone()) } } + +impl Iterable for Arc> { + fn boxed_iter(&self) -> Box + '_> { + Box::new(self.iter().map(|row_id| row_id as u64)) + } +} diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index efdb9d050a..b3d435b3ec 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -79,7 +79,7 @@ fn test_dataframe_writer_u64_multivalued() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("divisor").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 29); + assert_eq!(cols[0].num_bytes(), 50); let dyn_i64_col = cols[0].open().unwrap(); let DynamicColumn::I64(divisor_col) = dyn_i64_col else { panic!(); @@ -304,7 +304,7 @@ fn column_value_strategy() -> impl Strategy { ip_addr_byte ))), 1 => any::().prop_map(ColumnValue::Bool), - 1 => (0_679_723_993i64..1_679_723_995i64) + 1 => (679_723_993i64..1_679_723_995i64) .prop_map(|val| { ColumnValue::DateTime(DateTime::from_timestamp_secs(val)) }) ] } @@ -392,6 +392,7 @@ fn assert_columnar_eq( } } +#[track_caller] fn assert_column_eq( left: &Column, right: &Column, @@ -740,24 +741,68 @@ fn columnar_docs_and_remap( proptest! { #![proptest_config(ProptestConfig::with_cases(1000))] #[test] - fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in columnar_docs_and_remap()) { - let shuffled_rows: Vec> = shuffle_merge_order.iter() - .map(|row_addr| columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone()) - .collect(); - let expected_merged_columnar = build_columnar(&shuffled_rows[..]); - let columnar_readers: Vec = columnar_docs.iter() - .map(|docs| build_columnar(&docs[..])) - .collect::>(); - let columnar_readers_arr: Vec<&ColumnarReader> = columnar_readers.iter().collect(); - let mut output: Vec = Vec::new(); - let segment_num_rows: Vec = columnar_docs.iter().map(|docs| docs.len() as RowId).collect(); - let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order); - crate::merge_columnar(&columnar_readers_arr[..], &[], shuffle_merge_order.into(), &mut output).unwrap(); - let merged_columnar = ColumnarReader::open(output).unwrap(); - assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true); + fn test_columnar_merge_and_remap_proptest((columnar_docs, shuffle_merge_order) in +columnar_docs_and_remap()) { + test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order); } } +fn test_columnar_merge_and_remap( + columnar_docs: Vec>>, + shuffle_merge_order: Vec, +) { + let shuffled_rows: Vec> = shuffle_merge_order + .iter() + .map(|row_addr| { + columnar_docs[row_addr.segment_ord as usize][row_addr.row_id as usize].clone() + }) + .collect(); + let expected_merged_columnar = build_columnar(&shuffled_rows[..]); + let columnar_readers: Vec = columnar_docs + .iter() + .map(|docs| build_columnar(&docs[..])) + .collect::>(); + let columnar_readers_ref: Vec<&ColumnarReader> = columnar_readers.iter().collect(); + let mut output: Vec = Vec::new(); + let segment_num_rows: Vec = columnar_docs + .iter() + .map(|docs| docs.len() as RowId) + .collect(); + let shuffle_merge_order = ShuffleMergeOrder::for_test(&segment_num_rows, shuffle_merge_order); + crate::merge_columnar( + &columnar_readers_ref[..], + &[], + shuffle_merge_order.into(), + &mut output, + ) + .unwrap(); + let merged_columnar = ColumnarReader::open(output).unwrap(); + assert_columnar_eq(&merged_columnar, &expected_merged_columnar, true); +} + +#[test] +fn test_columnar_merge_and_remap_bug_1() { + let columnar_docs = vec![vec![ + vec![ + ("c1", ColumnValue::Numerical(NumericalValue::U64(0))), + ("c1", ColumnValue::Numerical(NumericalValue::U64(0))), + ], + vec![], + ]]; + let shuffle_merge_order: Vec = vec![ + RowAddr { + segment_ord: 0, + row_id: 1, + }, + RowAddr { + segment_ord: 0, + row_id: 0, + }, + ]; + + test_columnar_merge_and_remap(columnar_docs, shuffle_merge_order); +} + #[test] fn test_columnar_merge_empty() { let columnar_reader_1 = build_columnar(&[]); diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0b62a01782..4792574d06 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2212,7 +2212,7 @@ mod tests { #[test] fn test_fast_field_range() { - let ops: Vec<_> = (0..1000).map(|id| IndexingOp::add(id)).collect(); + let ops: Vec<_> = (0..1000).map(IndexingOp::add).collect(); assert!(test_operation_strategy(&ops, true).is_ok()); }