16
16
17
17
package io .cdap .plugin .batch .source ;
18
18
19
+ import com .github .pjfanning .xlsx .StreamingReader ;
19
20
import com .google .common .base .Preconditions ;
20
21
import com .google .common .base .Strings ;
21
22
import org .apache .hadoop .conf .Configuration ;
26
27
import org .apache .hadoop .io .Text ;
27
28
import org .apache .hadoop .mapreduce .InputSplit ;
28
29
import org .apache .hadoop .mapreduce .Job ;
30
+ import org .apache .hadoop .mapreduce .JobContext ;
29
31
import org .apache .hadoop .mapreduce .RecordReader ;
30
32
import org .apache .hadoop .mapreduce .TaskAttemptContext ;
31
33
import org .apache .hadoop .mapreduce .lib .input .FileSplit ;
32
34
import org .apache .hadoop .mapreduce .lib .input .TextInputFormat ;
33
- import org .apache .poi .hssf .usermodel .HSSFDateUtil ;
35
+ import org .apache .poi .EmptyFileException ;
36
+ import org .apache .poi .poifs .filesystem .FileMagic ;
34
37
import org .apache .poi .ss .usermodel .Cell ;
38
+ import org .apache .poi .ss .usermodel .DateUtil ;
35
39
import org .apache .poi .ss .usermodel .Row ;
36
40
import org .apache .poi .ss .usermodel .Sheet ;
37
41
import org .apache .poi .ss .usermodel .Workbook ;
38
42
import org .apache .poi .ss .usermodel .WorkbookFactory ;
39
43
import org .apache .poi .ss .util .CellReference ;
40
44
41
45
import java .io .IOException ;
46
+ import java .io .InputStream ;
42
47
import java .util .Iterator ;
43
48
44
49
@@ -67,6 +72,11 @@ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, Tas
67
72
return new ExcelRecordReader ();
68
73
}
69
74
75
+ @ Override
76
+ public boolean isSplitable (JobContext context , Path file ) {
77
+ return false ;
78
+ }
79
+
70
80
public static void setConfigurations (Job job , String filePattern , String sheet , boolean reprocess ,
71
81
String sheetValue , String columnList , boolean skipFirstRow ,
72
82
String terminateIfEmptyRow , String rowLimit , String ifErrorRecord ,
@@ -145,9 +155,31 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
145
155
String sheet = job .get (SHEET );
146
156
String sheetValue = job .get (SHEET_VALUE );
147
157
148
- Sheet workSheet ; // sheet can be used as common for XSSF and HSSF workbook
158
+ Sheet workSheet ;
159
+ Workbook workbook ;
160
+ boolean isStreaming = false ;
149
161
try {
150
- Workbook workbook = WorkbookFactory .create (fileIn );
162
+ // Use Magic Bytes to detect the file type
163
+ InputStream is = FileMagic .prepareToCheckMagic (fileIn );
164
+ byte [] emptyFileCheck = new byte [1 ];
165
+ is .mark (emptyFileCheck .length );
166
+ if (is .read (emptyFileCheck ) < emptyFileCheck .length ) {
167
+ throw new EmptyFileException ();
168
+ }
169
+ is .reset ();
170
+
171
+ final FileMagic fm = FileMagic .valueOf (is );
172
+ switch (fm ) {
173
+ case OOXML :
174
+ workbook = StreamingReader .builder ().rowCacheSize (10 ).open (is );
175
+ isStreaming = true ;
176
+ break ;
177
+ case OLE2 :
178
+ workbook = WorkbookFactory .create (is );
179
+ break ;
180
+ default :
181
+ throw new IOException ("Can't open workbook - unsupported file type: " + fm );
182
+ }
151
183
if (sheet .equalsIgnoreCase (SHEET_NAME )) {
152
184
workSheet = workbook .getSheet (sheetValue );
153
185
} else {
@@ -157,7 +189,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
157
189
throw new IllegalArgumentException ("Exception while reading excel sheet. " + e .getMessage (), e );
158
190
}
159
191
160
- rowCount = job .getInt (ROWS_LIMIT , workSheet .getPhysicalNumberOfRows ());
192
+ // As we cannot get the number of rows in a sheet while streaming.
193
+ // -1 is used as rowCount to indicate that all rows should be read.
194
+ rowCount = job .getInt (ROWS_LIMIT , isStreaming ? -1 : workSheet .getPhysicalNumberOfRows ());
161
195
rows = workSheet .iterator ();
162
196
lastRowNum = workSheet .getLastRowNum ();
163
197
rowIdx = 0 ;
@@ -171,7 +205,7 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
171
205
}
172
206
173
207
@ Override
174
- public boolean nextKeyValue () throws IOException , InterruptedException {
208
+ public boolean nextKeyValue () {
175
209
if (!rows .hasNext () || rowCount == 0 ) {
176
210
return false ;
177
211
}
@@ -200,18 +234,18 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
200
234
Cell cell = cellIterator .next ();
201
235
String colName = CellReference .convertNumToColString (cell .getColumnIndex ());
202
236
switch (cell .getCellType ()) {
203
- case Cell . CELL_TYPE_STRING :
237
+ case STRING :
204
238
sb .append (colName )
205
239
.append (COLUMN_SEPERATOR ).append (cell .getStringCellValue ()).append (CELL_SEPERATOR );
206
240
break ;
207
241
208
- case Cell . CELL_TYPE_BOOLEAN :
242
+ case BOOLEAN :
209
243
sb .append (colName )
210
244
.append (COLUMN_SEPERATOR ).append (cell .getBooleanCellValue ()).append (CELL_SEPERATOR );
211
245
break ;
212
246
213
- case Cell . CELL_TYPE_NUMERIC :
214
- if (HSSFDateUtil .isCellDateFormatted (cell )) {
247
+ case NUMERIC :
248
+ if (DateUtil .isCellDateFormatted (cell )) {
215
249
sb .append (colName ).append (COLUMN_SEPERATOR ).append (cell .getDateCellValue ()).append (CELL_SEPERATOR );
216
250
} else {
217
251
sb .append (colName )
0 commit comments