Skip to content

[TST] More benchmark queries for regex #4910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
366 changes: 290 additions & 76 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ members = [
]

[workspace.dependencies]
arrow = "52.2.0"
arrow = "55.1"
async-trait = "0.1"
axum = { version = "0.8", features = ["macros"] }
bytes = "1.10"
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["derive"] }
criterion = { version = "0.5", features = ["async_tokio"] }
figment = { version = "0.10.12", features = ["env", "yaml", "test"] }
flatbuffers = "24.3.25"
flatbuffers = "25.2.10"
futures = "0.3"
futures-core = "0.3"
http-body-util = "0.1.3"
Expand All @@ -55,7 +55,7 @@ opentelemetry-otlp = { version = "0.27", features = ["http-proto"] }
opentelemetry-http = { version = "0.27", features = ["reqwest"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
parking_lot = { version = "0.12.3", features = ["serde"] }
parquet = "52"
parquet = { version = "55.1", features = ["async"] }
prost = "0.13"
prost-types = "0.13.5"
regex = "1.11.1"
Expand Down
3 changes: 3 additions & 0 deletions rust/benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ path = "src/lib.rs"

[dependencies]
anyhow = "1.0.93"
arrow = { workspace = true }
async-tempfile = "0.6.0"
async-compression = { version = "0.4.18", features = ["tokio", "gzip", "bzip2"] }

bincode = { workspace = true }
criterion = { workspace = true }
futures = { workspace = true }
parquet = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -26,3 +28,4 @@ reqwest = { workspace = true, features = ["stream"] }
tokio-stream = { version = "0.1.16", features = ["full"] }
tokio-util = "0.7.12"
bloom = "0.3.2"
hf-hub = { version = "0.4.1", features = ["tokio"] }
1 change: 1 addition & 0 deletions rust/benchmark/src/datasets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod util;

pub mod gist;
pub mod ms_marco_queries;
pub mod rust;
pub mod scidocs;
pub mod sift;
pub mod wikipedia;
56 changes: 45 additions & 11 deletions rust/benchmark/src/datasets/rust.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,52 @@
use anyhow::{anyhow, Result};
use futures::io::BufReader;
use tokio::{
fs::{File, ReadDir},
io::AsyncWriteExt,
};
use std::path::PathBuf;

use super::util::get_or_populate_cached_dataset_file;
use anyhow::{Context, Result};
use arrow::array::AsArray;
use futures::{stream, StreamExt, TryStreamExt};
use hf_hub::api::tokio::Api;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use tokio::fs::File;

pub struct RustStack {
pub train: BufReader<File>,
pub struct TheStackDedupRust {
pub shard_paths: Vec<PathBuf>,
}

impl RustStack {
impl TheStackDedupRust {
pub async fn init() -> Result<Self> {
let dir = read_dir("~/Desktop/rust-stack");
let mut shard_paths = Vec::new();
let api = Api::new()?;
let dataset = api.dataset("bigcode/the-stack-dedup".to_string());
for i in 0..21 {
let shard_path = format!("data/rust/data-{:05}-of-00021.parquet", i);
let local_path = dataset.get(&shard_path).await?;
shard_paths.push(local_path);
}

Ok(Self { shard_paths })
}

pub async fn documents(&self) -> Result<Vec<String>> {
let mut shard_streams = Vec::new();
for shard_path in &self.shard_paths {
let file = File::open(shard_path).await?;
let shard_stream = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
shard_streams.push(shard_stream);
}
let batches = stream::iter(shard_streams)
.flatten()
.try_collect::<Vec<_>>()
.await?;
let mut documents = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
for batch in batches {
documents.extend(
batch
.column_by_name("content")
.context("Inspecting content column")?
.as_string::<i32>()
.iter()
.map(|doc| doc.unwrap_or_default().to_string()),
);
}
Ok(documents)
}
}
8 changes: 4 additions & 4 deletions rust/benchmark/src/datasets/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ async fn get_dataset_cache_path(

/// Calls the populate callback to create a cached dataset file if it doesn't exist, and returns the path to the cached file.
pub(crate) async fn get_or_populate_cached_dataset_file<F, Fut>(
dataset_name: &str,
file_name: &str,
dataset_name: impl AsRef<str>,
file_name: impl AsRef<str>,
cache_dir: Option<PathBuf>,
populate: F,
) -> Result<PathBuf>
where
F: FnOnce(Box<dyn AsyncWrite + Unpin + Send>) -> Fut,
Fut: Future<Output = Result<()>>,
{
let dataset_dir = get_dataset_cache_path(dataset_name, cache_dir).await?;
let file_path = dataset_dir.join(file_name);
let dataset_dir = get_dataset_cache_path(dataset_name.as_ref(), cache_dir).await?;
let file_path = dataset_dir.join(file_name.as_ref());

if !file_path.exists() {
// We assume that dataset creation was successful if the file exists, so we use a temporary file to avoid scenarios where the file is partially written and then the callback fails.
Expand Down
194 changes: 147 additions & 47 deletions rust/index/benches/literal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fs::{read_dir, File};

use arrow::array::AsArray;
use chroma_benchmark::benchmark::{bench_run, tokio_multi_thread};
use chroma_benchmark::{
benchmark::{bench_run, tokio_multi_thread},
datasets::rust::TheStackDedupRust,
};
use chroma_blockstore::{
arrow::provider::ArrowBlockfileProvider, provider::BlockfileProvider, BlockfileWriterOptions,
};
Expand All @@ -10,56 +10,149 @@ use chroma_index::fulltext::types::{DocumentMutation, FullTextIndexReader, FullT
use chroma_storage::{local::LocalStorage, Storage};
use chroma_types::regex::{literal_expr::NgramLiteralProvider, ChromaRegex};
use criterion::{criterion_group, criterion_main, Criterion};
use indicatif::{ParallelProgressIterator, ProgressIterator};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use tantivy::tokenizer::NgramTokenizer;
use tempfile::tempdir;

const BLOCK_SIZE: usize = 1 << 23;
const MAX_CODE_LENGTH: usize = 1 << 12;

const FTS_PATTERNS: &[&str] = &[r"unreachable", r"unsafe", r"use std::collections::HashMap;"];
const FTS_PATTERNS: &[&str] = &[
r"std::ptr::",
r"env_logger::",
r"tracing::",
r"futures::",
r"tokio::",
r"async_std::",
r"crossbeam::",
r"atomic::",
r"mpsc::",
r"Some(",
r"Ok(",
r"Err(",
r"None",
r"unwrap()",
r"expect()",
r"clone()",
r"Box::new",
r"Rc::new",
r"RefCell::new",
r"debug!(",
r"error!(",
r"warn!(",
r"panic!(",
r"todo!(",
r"join!(",
r"select!(",
r"unimplemented!(",
r"std::mem::transmute",
r"std::ffi::",
r"thread::sleep",
r"std::fs::File::open",
r"std::net::TcpListener",
r"use serde::",
r"use rand::",
r"use tokio::",
r"use futures::",
r"use anyhow::",
r"use thiserror::",
r"use chrono::",
r"serde::Serialize",
r"serde::Deserialize",
r"regex::Regex::new",
r"chrono::DateTime",
r"uuid::Uuid::new_v4",
r"proc_macro::TokenStream",
r"assert_eq!(",
r"assert_ne!(",
r"#[allow(dead_code)]",
r"#[allow(unused)]",
r"#[allow(unused_variables)]",
r"#[allow(unused_mut)]",
r"#[allow",
r"#[deny",
r"#[warn",
r"#[cfg",
r"#[feature",
r"#[derive(",
r"#[proc_macro]",
r"#[proc_macro_derive(",
r"#[proc_macro_attribute]",
r"#[test]",
r"#[tokio::test]",
r"///",
r"//!",
r"test_",
r"_tmp",
r"_old",
];
const REGEX_PATTERNS: &[&str] = &[
r"\.collect::<.+>()",
r"(?i)(TODO|FIXME)",
r"!\[allow\(clippy::.+\)\]",
r"(?m)^\s*fn\s+\w+",
r"(?m)^\s*pub\s+fn\s+\w+",
r"(?m)^\s*async\s+fn\s+\w+",
r"(?m)^\s*pub\s+async\s+fn\s+\w+",
r"fn\s+\w+\s*\([^)]*\)\s*->\s*\w+",
r"fn\s+\w+\s*\([^)]*Result<[^>]+>",
r"fn\s+\w+\s*\([^)]*Option<[^>]+>",
r"(\w+)::(\w+)\(",
r"\w+\.\w+\(",
r"(?m)^\s*struct\s+\w+",
r"(?m)^\s*pub\s+struct\s+\w+",
r"(?m)^\s*enum\s+\w+",
r"(?m)^\s*pub\s+enum\s+\w+",
r"(?m)^\s*trait\s+\w+",
r"(?m)^\s*pub\s+trait\s+\w+",
r"impl\s+(\w+)\s+for\s+(\w+)",
r"impl\s+(\w+)",
r"impl\s*<.*>\s*\w+",
r"\bSelf::\w+\(",
r"(?m)^\s*unsafe\s+fn\s+",
r"(?m)^\s*unsafe\s+\{",
r"\bunsafe\b",
r"fn\s+\w+\s*<",
r"struct\s+\w+\s*<",
r"enum\s+\w+\s*<",
r"impl\s*<.*>",
r"<[A-Za-z, ]+>",
r"\b'\w+\b",
r"&'\w+",
r"<'\w+>",
r"for<'\w+>",
r"macro_rules!\s*\w+",
r"\w+!\s*\(",
r"\blog!\s*\(",
r"\bdbg!\s*\(",
r"\bprintln!\s*\(",
r"\bassert!\s*\(",
r"log::\w+\(",
r"Result<[^>]+>",
r"Option<[^>]+>",
r"match\s+\w+\s*\{",
r"mod\s+tests\s*\{",
r"async\s+fn\s+\w+",
r"await\s*;?",
r"std::thread::spawn",
r"tokio::spawn",
r"match\s+.+\s*\{",
r"if\s+let\s+Some\(",
r"while\s+let\s+Some\(",
r"//.*",
r"/\*.*?\*/",
r"//\s*TODO",
r"//\s*FIXME",
r"//\s*HACK",
r"unsafe\s*\{",
r"<'\w+,\s*'\w+>",
r"for<'\w+>",
r"&'\w+\s*\w+",
r"where\s+",
r"T:\s*\w+",
r"dyn\s+\w+",
r"Box<dyn\s+\w+>",
r"impl\s+Trait",
r"temp\w*",
r"foo|bar|baz",
r"let\s+mut\s+\w+",
];

fn collect_rust_code() -> Vec<String> {
let files = read_dir("/Users/macronova/Desktop/rust-stack")
.expect("Directory should exist")
.collect::<Result<Vec<_>, _>>()
.expect("Files should be present");
let parquet_batches = files
.into_iter()
.progress()
.flat_map(|entry| {
ParquetRecordBatchReaderBuilder::try_new(
File::open(entry.path()).expect("File should be readable"),
)
.expect("Parquet file should be present")
.build()
.expect("Parquet file should be readable")
})
.collect::<Result<Vec<_>, _>>()
.expect("Parquet file should be valid");
parquet_batches
.into_par_iter()
.progress()
.flat_map(|batch| {
batch
.column_by_name("content")
.expect("Content column should be present")
.as_string::<i32>()
.iter()
.map(|os| os.unwrap_or_default().to_string())
.collect::<Vec<_>>()
})
.filter(|code| code.len() < MAX_CODE_LENGTH)
.collect()
}

async fn bench_fts_query((reader, pattern): (FullTextIndexReader<'_>, &str)) {
reader
.search(pattern)
Expand All @@ -76,7 +169,14 @@ async fn bench_literal_expr((reader, pattern): (FullTextIndexReader<'_>, ChromaR

fn bench_literal(criterion: &mut Criterion) {
let runtime = tokio_multi_thread();
let source_code_chunk = collect_rust_code();
let source_code_chunk = runtime.block_on(async {
TheStackDedupRust::init()
.await
.expect("the-stack-dedup-rust dataset should be initializable")
.documents()
.await
.expect("the dataset should contain documents")
});

let temp_dir = tempdir().expect("Temporary directory should be creatable");
let storage = Storage::Local(LocalStorage::new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ async fn test_k8s_integration_03_initialized_append_succeeds() {
seq_no: FragmentSeqNo(1),
start: 1,
limit: 2,
num_bytes: 1187,
num_bytes: 1044,
data: vec![(position, vec![42, 43, 44, 45])],
};
let postconditions = [
Condition::Manifest(ManifestCondition {
acc_bytes: 1187,
acc_bytes: 1044,
writer: "test writer".to_string(),
snapshots: vec![],
fragments: vec![fragment1.clone()],
Expand Down
Loading
Loading