Skip to content

Commit c7d631e

Browse files
committed
HADOOP-19400: Expand specification and contract test coverage for InputStream reads.
Closes #7367 Signed-off-by: Anuj Modi <[email protected]> Signed-off-by: Steve Loughran <[email protected]> (cherry picked from commit 32da0d6)
1 parent 2789fa7 commit c7d631e

File tree

3 files changed

+207
-4
lines changed

3 files changed

+207
-4
lines changed

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,11 @@ as implicitly set in `pos`.
129129
#### Preconditions
130130

131131
isOpen(FSDIS)
132-
buffer != null else raise NullPointerException
133-
length >= 0
134-
offset < len(buffer)
135-
length <= len(buffer) - offset
132+
buffer != null else raise NullPointerException, IllegalArgumentException
133+
offset >= 0 else raise IndexOutOfBoundsException
134+
length >= 0 else raise IndexOutOfBoundsException, IllegalArgumentException
135+
offset < len(buffer) else raise IndexOutOfBoundsException
136+
length <= len(buffer) - offset else raise IndexOutOfBoundsException
136137
pos >= 0 else raise EOFException, IOException
137138

138139
Exceptions that may be raised on precondition failure are
@@ -175,6 +176,19 @@ must block until at least one byte is returned. Thus, for any data source
175176
of length greater than zero, repeated invocations of this `read()` operation
176177
will eventually read all the data.
177178

179+
#### Implementation Notes
180+
181+
1. If the caller passes a `null` buffer, then an unchecked exception MUST be thrown. The base JDK
182+
`InputStream` implementation throws `NullPointerException`. HDFS historically used
183+
`IllegalArgumentException`. Implementations MAY use either of these.
184+
1. If the caller passes a negative value for `length`, then an unchecked exception MUST be thrown.
185+
The base JDK `InputStream` implementation throws `IndexOutOfBoundsException`. HDFS historically used
186+
`IllegalArgumentException`. Implementations MAY use either of these.
187+
1. Reads through any method MUST return the same data.
188+
1. Callers MAY interleave calls to different read methods (single-byte and multi-byte) on the same
189+
stream. The stream MUST return the same underlying data, regardless of the specific read calls or
190+
their ordering.
191+
178192
### <a name="Seekable.seek"></a>`Seekable.seek(s)`
179193

180194

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
3939
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4040
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
41+
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
42+
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDatasetSingleByteReads;
43+
import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
4144
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
4245
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
4346
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -408,4 +411,137 @@ public void testFloatingPointLength() throws Throwable {
408411
.isEqualTo(len);
409412
}
410413

414+
@Test
415+
public void testInputStreamReadNullBuffer() throws Throwable {
416+
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
417+
// NullPointerException. Historically, DFSInputStream has thrown IllegalArgumentException
418+
// instead. Allow either behavior.
419+
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
420+
"NullPointerException");
421+
Path path = methodPath();
422+
FileSystem fs = getFileSystem();
423+
int len = 4096;
424+
createFile(fs, path, true,
425+
dataset(len, 0x40, 0x80));
426+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
427+
Assertions.assertThatThrownBy(() -> is.read(null, 0, 10))
428+
.isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class);
429+
}
430+
}
431+
432+
@Test
433+
public void testInputStreamReadNegativePosition() throws Throwable {
434+
describe("Attempting to read into a negative position should throw IndexOutOfBoundsException");
435+
Path path = methodPath();
436+
FileSystem fs = getFileSystem();
437+
int len = 4096;
438+
createFile(fs, path, true,
439+
dataset(len, 0x40, 0x80));
440+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
441+
Assertions.assertThatThrownBy(() -> is.read(new byte[10], -1, 10))
442+
.isInstanceOf(IndexOutOfBoundsException.class);
443+
}
444+
}
445+
446+
@Test
447+
public void testInputStreamReadNegativeLength() throws Throwable {
448+
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
449+
// IndexOutOfBoundsException. Historically, DFSInputStream has thrown IllegalArgumentException
450+
// instead. Allow either behavior.
451+
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
452+
"IndexOutOfBoundsException");
453+
Path path = methodPath();
454+
FileSystem fs = getFileSystem();
455+
int len = 4096;
456+
createFile(fs, path, true,
457+
dataset(len, 0x40, 0x80));
458+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
459+
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, -1))
460+
.isInstanceOfAny(IllegalArgumentException.class, IndexOutOfBoundsException.class);
461+
}
462+
}
463+
464+
@Test
465+
public void testInputStreamReadTooLong() throws Throwable {
466+
describe("Attempting a read longer than the buffer should throw IndexOutOfBoundsException");
467+
Path path = methodPath();
468+
FileSystem fs = getFileSystem();
469+
int len = 4096;
470+
createFile(fs, path, true,
471+
dataset(len, 0x40, 0x80));
472+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
473+
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, 11))
474+
.isInstanceOf(IndexOutOfBoundsException.class);
475+
}
476+
}
477+
478+
@Test
479+
public void testInputStreamReadZeroLengthRead() throws Throwable {
480+
describe("Reading 0 bytes is a no-op");
481+
Path path = methodPath();
482+
FileSystem fs = getFileSystem();
483+
int len = 4096;
484+
createFile(fs, path, true,
485+
dataset(len, 0x40, 0x80));
486+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
487+
Assertions.assertThat(is.read(new byte[10], 0, 0)).describedAs("bytes read").isEqualTo(0);
488+
}
489+
}
490+
491+
@Test
492+
public void testInputStreamConsistentEOF() throws Throwable {
493+
describe("Both single-byte and multi-byte read should report EOF after consuming stream");
494+
Path path = methodPath();
495+
FileSystem fs = getFileSystem();
496+
int len = 4096;
497+
createFile(fs, path, true,
498+
dataset(len, 0x40, 0x80));
499+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
500+
IOUtils.skipFully(is, len);
501+
Assertions.assertThat(is.read()).describedAs("single byte EOF").isEqualTo(-1);
502+
Assertions.assertThat(is.read(new byte[10], 0, 10)).describedAs("multi byte EOF")
503+
.isEqualTo(-1);
504+
}
505+
}
506+
507+
@Test
508+
public void testInputStreamSingleAndMultiByteReadsEqual() throws Throwable {
509+
describe("Single-byte and multi-byte read should return the same bytes");
510+
Path path = methodPath();
511+
FileSystem fs = getFileSystem();
512+
int len = 4096;
513+
createFile(fs, path, true,
514+
dataset(len, 0x40, 0x80));
515+
byte[] multiByteReads = readDataset(fs, path, len);
516+
byte[] singleByteReads = readDatasetSingleByteReads(fs, path, len);
517+
compareByteArrays(multiByteReads, singleByteReads, len);
518+
}
519+
520+
@Test
521+
public void testInputStreamMixedSingleAndMultiByteReadsEqual() throws Throwable {
522+
describe("Mixed single and multi-byte reads on the same stream should return the same bytes");
523+
Path path = methodPath();
524+
FileSystem fs = getFileSystem();
525+
int len = 4096;
526+
createFile(fs, path, true,
527+
dataset(len, 0x40, 0x80));
528+
byte[] expected = readDataset(fs, path, len);
529+
byte[] actual = new byte[len];
530+
int readSize = 128;
531+
try (FSDataInputStream is = fs.openFile(path).build().get()) {
532+
for (int offset = 0; offset < len; offset = offset + readSize + readSize) {
533+
if (readNBytes(is, actual, offset, readSize) != readSize) {
534+
fail("End of file reached before reading fully.");
535+
}
536+
for (int i = 0; i < readSize; ++i) {
537+
int nextByte = is.read();
538+
if (-1 == nextByte) {
539+
fail("End of file reached before reading fully.");
540+
}
541+
actual[offset + readSize + i] = (byte)nextByte;
542+
}
543+
}
544+
}
545+
compareByteArrays(expected, actual, len);
546+
}
411547
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ public static void writeDataset(FileSystem fs, Path path, byte[] src,
219219
* Read the file and convert to a byte dataset.
220220
* This implements readfully internally, so that it will read
221221
* in the file without ever having to seek()
222+
* The read is performed through a sequence of calls to multi-byte
223+
* {@link InputStream#read(byte[], int, int)}.
222224
* @param fs filesystem
223225
* @param path path to read from
224226
* @param len length of data to read
@@ -242,6 +244,57 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
242244
return dest;
243245
}
244246

247+
/**
248+
* Read a file completely and return the contents in a byte buffer. The read is performed through
249+
* repeated calls to single-byte {@link InputStream#read()}.
250+
* @param fs filesystem
251+
* @param path path to read from
252+
* @param len length of data to read
253+
* @return the bytes
254+
* @throws IOException IO problems
255+
*/
256+
public static byte[] readDatasetSingleByteReads(FileSystem fs, Path path, int len)
257+
throws IOException {
258+
byte[] dest = new byte[len];
259+
try (FSDataInputStream in = fs.open(path)) {
260+
for (int i = 0; i < len; ++i) {
261+
int nextByte = in.read();
262+
if (-1 == nextByte) {
263+
throw new EOFException("End of file reached before reading fully.");
264+
}
265+
dest[i] = (byte)nextByte;
266+
}
267+
}
268+
return dest;
269+
}
270+
271+
/**
272+
* Reads {@code len} bytes into offset {@code off} of target buffer {@code b}, up to EOF,
273+
* potentially blocking until enough bytes are available on the stream. This is similar to
274+
* {@code InputStream#readNBytes(int)} introduced in Java 11, except the caller owns allocation of
275+
* the target buffer.
276+
* @param is input stream to read
277+
* @param b target buffer
278+
* @param off offset within {@code b}
279+
* @param len length of data to read
280+
* @return number of bytes read
281+
* @throws IOException IO problems
282+
*/
283+
public static int readNBytes(InputStream is, byte[] b, int off, int len)
284+
throws IOException {
285+
int totalBytes = 0;
286+
while (len > 0) {
287+
int nbytes = is.read(b, off, len);
288+
if (nbytes < 0) {
289+
break;
290+
}
291+
totalBytes += nbytes;
292+
off += nbytes;
293+
len -= nbytes;
294+
}
295+
return totalBytes;
296+
}
297+
245298
/**
246299
* Read a file, verify its length and contents match the expected array.
247300
* @param fs filesystem

0 commit comments

Comments
 (0)