Skip to content

Commit

Permalink
HADOOP-19400: Expand specification and contract test coverage for Inp…
Browse files Browse the repository at this point in the history
…utStream reads.
  • Loading branch information
cnauroth committed Feb 10, 2025
1 parent 872ebda commit 617d9b5
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ as implicitly set in `pos`.
#### Preconditions

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
buffer != null else raise NullPointerException, IllegalArgumentException
offset >= 0 else raise IndexOutOfBoundsException
length >= 0 else raise IndexOutOfBoundsException, IllegalArgumentException
offset < len(buffer) else raise IndexOutOfBoundsException
length <= len(buffer) - offset else raise IndexOutOfBoundsException
pos >= 0 else raise EOFException, IOException

Exceptions that may be raised on precondition failure are
Expand Down Expand Up @@ -175,6 +176,19 @@ must block until at least one byte is returned. Thus, for any data source
of length greater than zero, repeated invocations of this `read()` operation
will eventually read all the data.

#### Implementation Notes

1. If the caller passes a `null` buffer, then an unchecked exception MUST be thrown. The base JDK
`InputStream` implementation throws `NullPointerException`. HDFS historically used
`IllegalArgumentException`. Implementations MAY use either of these.
1. If the caller passes a negative value for `length`, then an unchecked exception MUST be thrown.
The base JDK `InputStream` implementation throws `IndexOutOfBoundsException`. HDFS historically used
`IllegalArgumentException`. Implementations MAY use either of these.
1. Reads through any method MUST return the same data.
1. Callers MAY interleave calls to different read methods (single-byte and multi-byte) on the same
stream. The stream MUST return the same underlying data, regardless of the specific read calls or
their ordering.

### <a name="Seekable.seek"></a>`Seekable.seek(s)`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDatasetSingleByteReads;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
Expand Down Expand Up @@ -408,4 +411,137 @@ public void testFloatingPointLength() throws Throwable {
.isEqualTo(len);
}

@Test
public void testInputStreamReadNullBuffer() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// NullPointerException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"NullPointerException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(null, 0, 10))
.isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class);
}
}

@Test
public void testInputStreamReadNegativePosition() throws Throwable {
describe("Attempting to read into a negative position should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], -1, 10))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadNegativeLength() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// IndexOutOfBoundsException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, -1))
.isInstanceOfAny(IllegalArgumentException.class, IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadTooLong() throws Throwable {
describe("Attempting a read longer than the buffer should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, 11))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadZeroLengthRead() throws Throwable {
describe("Reading 0 bytes is a no-op");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThat(is.read(new byte[10], 0, 0)).describedAs("bytes read").isEqualTo(0);
}
}

@Test
public void testInputStreamConsistentEOF() throws Throwable {
describe("Both single-byte and multi-byte read should report EOF after consuming stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
IOUtils.skipFully(is, len);
Assertions.assertThat(is.read()).describedAs("single byte EOF").isEqualTo(-1);
Assertions.assertThat(is.read(new byte[10], 0, 10)).describedAs("multi byte EOF")
.isEqualTo(-1);
}
}

@Test
public void testInputStreamSingleAndMultiByteReadsEqual() throws Throwable {
describe("Single-byte and multi-byte read should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] multiByteReads = readDataset(fs, path, len);
byte[] singleByteReads = readDatasetSingleByteReads(fs, path, len);
compareByteArrays(multiByteReads, singleByteReads, len);
}

@Test
public void testInputStreamMixedSingleAndMultiByteReadsEqual() throws Throwable {
describe("Mixed single and multi-byte reads on the same stream should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] expected = readDataset(fs, path, len);
byte[] actual = new byte[len];
int readSize = 128;
try (FSDataInputStream is = fs.openFile(path).build().get()) {
for (int offset = 0; offset < len; offset = offset + readSize + readSize) {
if (readNBytes(is, actual, offset, readSize) != readSize) {
fail("End of file reached before reading fully.");
}
for (int i = 0; i < readSize; ++i) {
int nextByte = is.read();
if (-1 == nextByte) {
fail("End of file reached before reading fully.");
}
actual[offset + readSize + i] = (byte)nextByte;
}
}
}
compareByteArrays(expected, actual, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public static void writeDataset(FileSystem fs, Path path, byte[] src,
* Read the file and convert to a byte dataset.
* This implements readfully internally, so that it will read
* in the file without ever having to seek()
* The read is performed through a sequence of calls to multi-byte
* {@link InputStream#read(byte[], int, int)}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
Expand All @@ -242,6 +244,57 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
return dest;
}

/**
* Read a file completely and return the contents in a byte buffer. The read is performed through
* repeated calls to single-byte {@link InputStream#read()}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
* @return the bytes
* @throws IOException IO problems
*/
public static byte[] readDatasetSingleByteReads(FileSystem fs, Path path, int len)
throws IOException {
byte[] dest = new byte[len];
try (FSDataInputStream in = fs.open(path)) {
for (int i = 0; i < len; ++i) {
int nextByte = in.read();
if (-1 == nextByte) {
throw new EOFException("End of file reached before reading fully.");
}
dest[i] = (byte)nextByte;
}
}
return dest;
}

/**
* Reads {@code len} bytes into offset {@code off} of target buffer {@code b}, up to EOF,
* potentially blocking until enough bytes are available on the stream. This is similar to
* {@code InputStream#readNBytes(int)} introduced in Java 11, except the caller owns allocation of
* the target buffer.
* @param is input stream to read
* @param b target buffer
* @param off offset within {@code b}
* @param len length of data to read
* @return number of bytes read
* @throws IOException IO problems
*/
public static int readNBytes(InputStream is, byte[] b, int off, int len)
throws IOException {
int totalBytes = 0;
while (len > 0) {
int nbytes = is.read(b, off, len);
if (nbytes < 0) {
break;
}
totalBytes += nbytes;
off += nbytes;
len -= nbytes;
}
return totalBytes;
}

/**
* Read a file, verify its length and contents match the expected array.
* @param fs filesystem
Expand Down

0 comments on commit 617d9b5

Please sign in to comment.