Skip to content

Commit

Permalink
HADOOP-19400: Expand specification and contract test coverage for Inp…
Browse files Browse the repository at this point in the history
…utStream reads.
  • Loading branch information
cnauroth committed Feb 7, 2025
1 parent 872ebda commit 3a8608b
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,24 @@ must block until at least one byte is returned. Thus, for any data source
of length greater than zero, repeated invocations of this `read()` operation
will eventually read all the data.

#### Implementation Notes

1. If the caller passes a `null` buffer, then an unchecked exception is thrown. The base JDK
`InputStream` implementation throws `NullPointerException`. HDFS historically used
`IllegalArgumentException`. Either of these are acceptable.
1. If the caller passes a negative value for `offset`, then `IndexOutOfBoundsException` is thrown.
1. If the caller passes a negative value for `length`, then an unchecked exception is thrown. The
base JDK `InputStream` implementation throws `IndexOutOfBoundsException`. HDFS historically used
`IllegalArgumentException`. Either of these are acceptable.
1. If the caller passes an `offset + length` that would run past the length of `buffer`, then
`IndexOutOfBoundsException` is thrown.
1. A read of `length` 0 is a no-op, and the returned `result` is 0. No exception is thrown, assuming
all other arguments are valid.
1. Reads through any method are expected to return the same data.
1. Callers may interleave calls to different read methods (single-byte and multi-byte) on the same
stream. The expectation is to receive the same underlying data, regardless of the specific read
calls or their ordering.

### <a name="Seekable.seek"></a>`Seekable.seek(s)`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDatasetSingleByteReads;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
Expand Down Expand Up @@ -408,4 +411,137 @@ public void testFloatingPointLength() throws Throwable {
.isEqualTo(len);
}

@Test
public void testInputStreamReadNullBuffer() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// NullPointerException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"NullPointerException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(null, 0, 10))
.isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class);
}
}

@Test
public void testInputStreamReadNegativePosition() throws Throwable {
describe("Attempting to read into a negative position should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], -1, 10))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadNegativeLength() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// IndexOutOfBoundsException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, -1))
.isInstanceOfAny(IllegalArgumentException.class, IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadTooLong() throws Throwable {
describe("Attempting a read longer than the buffer should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, 11))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadZeroLengthRead() throws Throwable {
describe("Reading 0 bytes is a no-op");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThat(is.read(new byte[10], 0, 0)).describedAs("bytes read").isEqualTo(0);
}
}

@Test
public void testInputStreamConsistentEOF() throws Throwable {
describe("Both single-byte and multi-byte read should report EOF after consuming stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
IOUtils.skipFully(is, len);
Assertions.assertThat(is.read()).describedAs("single byte EOF").isEqualTo(-1);
Assertions.assertThat(is.read(new byte[10], 0, 10)).describedAs("multi byte EOF")
.isEqualTo(-1);
}
}

@Test
public void testInputStreamSingleAndMultiByteReadsEqual() throws Throwable {
describe("Single-byte and multi-byte read should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] multiByteReads = readDataset(fs, path, len);
byte[] singleByteReads = readDatasetSingleByteReads(fs, path, len);
compareByteArrays(multiByteReads, singleByteReads, len);
}

@Test
public void testInputStreamMixedSingleAndMultiByteReadsEqual() throws Throwable {
describe("Mixed single and multi-byte reads on the same stream should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] expected = readDataset(fs, path, len);
byte[] actual = new byte[len];
int readSize = 128;
try (FSDataInputStream is = fs.openFile(path).build().get()) {
for (int offset = 0; offset < len; offset = offset + readSize + readSize) {
if (readNBytes(is, actual, offset, readSize) != readSize) {
fail("End of file reached before reading fully.");
}
for (int i = 0; i < readSize; ++i) {
int nextByte = is.read();
if (-1 == nextByte) {
fail("End of file reached before reading fully.");
}
actual[offset + readSize + i] = (byte)nextByte;
}
}
}
compareByteArrays(expected, actual, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public static void writeDataset(FileSystem fs, Path path, byte[] src,
* Read the file and convert to a byte dataset.
* This implements readfully internally, so that it will read
* in the file without ever having to seek()
* The read is performed through a sequence of calls to multi-byte
* {@link InputStream#read(byte[], int, int)}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
Expand All @@ -242,6 +244,57 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
return dest;
}

/**
* Read a file completely and return the contents in a byte buffer. The read is performed through
* repeated calls to single-byte {@link InputStream#read()}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
* @return the bytes
* @throws IOException IO problems
*/
public static byte[] readDatasetSingleByteReads(FileSystem fs, Path path, int len)
throws IOException {
byte[] dest = new byte[len];
try (FSDataInputStream in = fs.open(path)) {
for (int i = 0; i < len; ++i) {
int nextByte = in.read();
if (-1 == nextByte) {
throw new EOFException("End of file reached before reading fully.");
}
dest[i] = (byte)nextByte;
}
}
return dest;
}

/**
* Reads {@code len} bytes into offset {@code off} of target buffer {@code b}, up to EOF,
* potentially blocking until enough bytes are available on the stream. This is similar to
* {@code InputStream#readNBytes(int)} introduced in Java 11, except the caller owns allocation of
* the target buffer.
* @param is input stream to read
* @param b target buffer
* @param off offset within {@code b}
* @param len length of data to read
* @return number of bytes read
* @throws IOException IO problems
*/
public static int readNBytes(InputStream is, byte[] b, int off, int len)
throws IOException {
int totalBytes = 0;
while (len > 0) {
int nbytes = is.read(b, off, len);
if (nbytes < 0) {
break;
}
totalBytes += nbytes;
off += nbytes;
len -= nbytes;
}
return totalBytes;
}

/**
* Read a file, verify its length and contents match the expected array.
* @param fs filesystem
Expand Down

0 comments on commit 3a8608b

Please sign in to comment.