-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcsv-split.h
More file actions
154 lines (121 loc) · 3.58 KB
/
csv-split.h
File metadata and controls
154 lines (121 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#ifndef __CSV_SPLIT_H
#define __CSV_SPLIT_H
#include "csv-buf.h"
#include "queue.h"
#include <getopt.h>
#include "csv.h"
/**
* Version number
*/
#define CSV_SPLIT_VERSION "0.1.1"
/**
* Default IO thread count and sane min/max values
*/
#define IO_THREADS_DEFAULT 1
#define IO_THREADS_MIN 1
#define IO_THREADS_MAX 10
/**
* How much of a backlog to allow in our IO queue
*/
#define BG_QUEUE_MAX 20
/**
* The CSV realloc size, we're being aggressive here
*/
#define CSV_BLK_SIZE 1024
/**
* Initial size of our passthrough buffer, be aggressive
*/
#define BUFFER_SIZE 1024*1000*10
/**
* How much data to read at a time
*/
#define READ_BUF_SIZE 32768
/**
* Environment variable for payload file
*/
#define ENV_PAYLOAD_VAR "CSV_PAYLOAD_FILE"
#define ENV_ROWCOUNT_VAR "CSV_ROWCOUNT"
// Context we'll need for our split operation
struct csv_context {
// Our input file and output path
char in_file[255], out_path[255];
// The prefix to use when we split
const char *in_prefix;
// Trigger command to run when a chunk is done
char trigger_cmd[255];
// Should we read from stdin?
int from_stdin;
// Which part are we on
unsigned int on_file;
// The number of rows, and our current column
unsigned long row, col;
// The maximum number of rows per file
unsigned long max_rows;
/**
* Mark our overflow position here, which lets us specify that
* we've gone past our maximum row count but may need to in order
* to keep 'group together' column data together
*/
size_t opos;
/**
* "group together" column, meaning that we will never split rows
* with the same value for this column apart. We assume the
* rows are sorted by this column
*/
int gcol;
/**
* GZIP compression level (zero for none)
*/
int gzip;
/**
* Our header injection flag as well as the length of the header once
* we find it. This will be used to copy the header to each split file
* as we go. If count_header is set, we'll count it in the number of
* output rows, otherwise we will not count the header (default).
*/
unsigned short use_header;
unsigned short count_header;
unsigned int header_len;
// Simple flag to let us know if we should put a comma
unsigned int put_comma;
// The last group column we encountered, so we can detect when it changes
cbuf gcol_buf;
// A buffer we're using and re-using to write the CSV output data
cbuf csv_buf;
// Our blocking, thread-safe, IO queue
fqueue io_queue;
// The number of threads we're using, and storage for them
unsigned int thread_count;
pthread_t *io_threads;
// Our CSV parser
struct csv_parser parser;
};
/**
* An item with enough information for our IO consumers to write to disk
*/
struct q_flush_item {
// The filename where we'll write data
char out_file[255];
// Our trigger command
const char *trigger_cmd;
// Our row count
unsigned long row_count;
// The data we'll be writing
char *str;
// The data length
size_t len;
// gzip compression level (zero for none)
int gzip;
};
static const struct option g_long_opts[] = {
{ "group-col", required_argument, NULL, 'g' },
{ "num-rows", required_argument, NULL, 'n' },
{ "io-threads", optional_argument, NULL, 'i'},
{ "stdin", no_argument, NULL, 0 },
{ "trigger", required_argument, NULL, 't'},
{ "version", no_argument, NULL, 'v'},
{ "gzip", optional_argument, NULL, 'z'},
{ "header", optional_argument, NULL, 'd'},
{ 0, 0, 0, 0}
};
#endif