Skip to content

Commit

Permalink
encoding.csv: add support for multithreading to `encoding.csv.RandomA…
Browse files Browse the repository at this point in the history
…ccessReader` (#23677)
  • Loading branch information
penguindark authored Feb 9, 2025
1 parent f3493e1 commit 089778e
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 21 deletions.
72 changes: 51 additions & 21 deletions vlib/encoding/csv/csv_reader_random_access.v
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mut:
htype ColumType = .string
}

@[heap]
pub struct RandomAccessReader {
pub mut:
index i64
Expand Down Expand Up @@ -71,7 +72,8 @@ pub mut:
mem_buf_start i64 = -1 // start index in the file of the read buffer
mem_buf_end i64 = -1 // end index in the file of the read buffer
// csv map for quick access
csv_map [][]i64
create_map_csv bool = true // flag to enable the csv map creation
csv_map [][]i64
// header
header_row int = -1 // row index of the header in the csv_map
header_list []HeaderItem // list of the header item
Expand All @@ -81,19 +83,20 @@ pub mut:
@[params]
pub struct RandomAccessReaderConfig {
pub:
scr_buf voidptr // pointer to the buffer of data
scr_buf_len i64 // if > 0 use the RAM pointed from scr_buf as source of data
file_path string
start_index i64
end_index i64 = -1
mem_buf_size int = 1024 * 64 // default buffer size 64KByte
separator u8 = `,`
comment u8 = `#` // every line that start with the quote char is ignored
default_cell string = '*' // return this string if out of the csv boundaries
empty_cell string // return this string if empty cell
end_line_len int = endline_cr_len // size of the endline rune
quote u8 = `"` // double quote is the standard quote char
quote_remove bool // if true clear the cell from the quotes
scr_buf voidptr // pointer to the buffer of data
scr_buf_len i64 // if > 0 use the RAM pointed from scr_buf as source of data
file_path string
start_index i64
end_index i64 = -1
mem_buf_size int = 1024 * 64 // default buffer size 64KByte
separator u8 = `,`
comment u8 = `#` // every line that start with the quote char is ignored
default_cell string = '*' // return this string if out of the csv boundaries
empty_cell string // return this string if empty cell
end_line_len int = endline_cr_len // size of the endline rune
quote u8 = `"` // double quote is the standard quote char
quote_remove bool // if true clear the cell from the quotes
create_map_csv bool = true // if true make the map of the csv file
}

/******************************************************************************
Expand Down Expand Up @@ -177,7 +180,10 @@ pub fn csv_reader(cfg RandomAccessReaderConfig) !&RandomAccessReader {
cr.quote_remove = cfg.quote_remove
cr.quote = cfg.quote

cr.map_csv()!
cr.create_map_csv = cfg.create_map_csv
if cr.create_map_csv {
cr.map_csv()!
}

return cr
}
Expand Down Expand Up @@ -226,6 +232,18 @@ fn (mut cr RandomAccessReader) fill_buffer(i i64) !i64 {
return i64(-1)
}

// copy_configuration copies the configuration from another csv RandomAccessReader
// this function is a helper for using the RandomAccessReader in multi threaded applications
// pay attention to the free process
pub fn (mut cr RandomAccessReader) copy_configuration(src_cr RandomAccessReader) {
cr.header_row = src_cr.header_row
unsafe {
cr.header_list = &src_cr.header_list
cr.header_map = &src_cr.header_map
cr.csv_map = &src_cr.csv_map
}
}

/******************************************************************************
*
* Csv mapper, mapped reader
Expand All @@ -248,8 +266,15 @@ pub fn (mut cr RandomAccessReader) map_csv() ! {
p := &u8(cr.mem_buf)
cr.csv_map << []i64{}
cr.csv_map[0] << if cr.is_bom_present { 3 } else { 0 } // skip the BOM data

// mut counter := i64(0)
for i < cr.end_index {
read_bytes_count := cr.fill_buffer(i)!

// DEBUG print
// perc := f32(counter) / f32(cr.end_index) * 100.0
// println("${perc:.2f}")

// println("${i:-12d} of ${cr.f_len:-12d} readed: ${read_bytes_count}")
mut p1 := p
mut i1 := i64(0)
Expand Down Expand Up @@ -317,6 +342,7 @@ pub fn (mut cr RandomAccessReader) map_csv() ! {
}
}
i += read_bytes_count
// counter += i1
}
}
// remove last row if it is not a valid one
Expand Down Expand Up @@ -428,13 +454,17 @@ pub fn (mut cr RandomAccessReader) get_cellt(cfg GetCellConfig) !CellValue {
if cr.header_row >= 0 && cfg.x < cr.header_list.len {
h := cr.header_list[cfg.x]
res := cr.get_cell(cfg)!
if h.htype == .int {
return res.int()
}
if h.htype == .f32 {
return res.f32()
match h.htype {
.int {
return res.trim_space().int()
}
.string {
return res
}
.f32 {
return res.trim_space().f32()
}
}
return res
}
return cr.get_cell(cfg)!
}
Expand Down
123 changes: 123 additions & 0 deletions vlib/encoding/csv/csv_reader_test.v
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Known limitations:
import encoding.csv
import strings
import os
import rand

/******************************************************************************
*
Expand Down Expand Up @@ -347,3 +348,125 @@ fn test_coherence() {
fn main() {
test_csv_string()
}

// Multithreaded tests

fn create_csv(file_path string, size int) !i64 {
// create csv file for the test
mut csv_txt := 'pippo,count,count1,pera,sempronio,float'

mut f := os.open_file(file_path, 'w')!
f.write_string(csv_txt + '\n')!
mut count := i64(0)
for i in 0 .. size {
tmp := "${rand.int()}, ${i}, 3, \"txt1${i}\", \"txt2${i}\", ${f32(rand.u32()) / 1000.0}\n"
f.write_string(tmp)!
// if i % 1_000_000 == 0 {
// println(i)
// }
count += i
}
f.close()
return count
}

fn read_lines(id int, csvr csv.RandomAccessReader, mut data [][]csv.CellValue, start_row int, end_row int) {
// println(" func ${data.len},${data[1].len}")
unsafe {
for count, col_elem in csvr.header_list {
// println("Check: ${col_elem}")
match col_elem.htype {
.string {
// println('id:${id} String here')
for row_index in start_row .. end_row {
// println("str ${count},${row_index}")
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
panic('Str get_cell failed')
}
}
}
.int {
// println('id:${id} Int here')
for row_index in start_row .. end_row {
// println("int ${count},${row_index}")
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
panic('Int get_cell failed')
}.trim_space().int()
}
}
.f32 {
// println('id:${id} f32 here')
for row_index in start_row .. end_row {
// println("f32 ${count},${row_index}")
data[count][row_index - 1] = csvr.get_cell(x: count, y: row_index) or {
panic('F32 get_cell failed')
}.trim_space().f32()
}
}
}
}
} // unsafe
}

fn test_multithreading() {
file_path_str := os.join_path(os.temp_dir(), 'test_csv.csv')
size := 10_000

// create the test file
res_count := create_csv(file_path_str, size)!

slices := 2 // number of slice of the csv
mem_buf_size := 1024 * 1024 * 1

mut csvr := []csv.RandomAccessReader{}

// init first csv reader
csvr << csv.csv_reader(file_path: file_path_str, mem_buf_size: mem_buf_size)!
csvr[0].build_header_dict(csv.GetHeaderConf{})!

// init other csv readers using the first reader configuration
for _ in 1 .. slices {
mut tmp_csvr := csv.csv_reader(
file_path: file_path_str
mem_buf_size: mem_buf_size
create_map_csv: false
)!
tmp_csvr.copy_configuration(csvr[0])
csvr << tmp_csvr
}

// read the data from the csv file
mut data := [][]csv.CellValue{}

n_rows := csvr[0].csv_map.len
unsafe {
data = [][]csv.CellValue{len: csvr[0].header_list.len, init: []csv.CellValue{len: n_rows}}
}
step := n_rows / slices
mut start := 1
mut end := if (start + step) > n_rows { n_rows } else { start + step }

mut threads := []thread{}
for task_index in 0 .. slices {
threads << spawn read_lines(task_index, csvr[task_index], mut &data, start, end)
start = end
end = if (start + step) > n_rows { n_rows } else { start + step }
}
threads.wait()

// release the csv readers
for mut item in csvr {
item.dispose_csv_reader()
}

// check for the integer column sum
mut ck_count := i64(0)
for i in 0 .. csvr[0].csv_map.len - 1 {
ck_count += data[1][i] as int
}

assert ck_count == res_count, 'check on csv file failed!'

// remove the temp file
os.rm(file_path_str)!
}

0 comments on commit 089778e

Please sign in to comment.