Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19400: Expand specification and contract test coverage for InputStream reads. #7367

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ as implicitly set in `pos`.
#### Preconditions

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
buffer != null else raise NullPointerException, IllegalArgumentException
offset >= 0 else raise IndexOutOfBoundsException
length >= 0 else raise IndexOutOfBoundsException, IllegalArgumentException
offset < len(buffer) else raise IndexOutOfBoundsException
length <= len(buffer) - offset else raise IndexOutOfBoundsException
pos >= 0 else raise EOFException, IOException

Exceptions that may be raised on precondition failure are
Expand Down Expand Up @@ -175,6 +176,19 @@ must block until at least one byte is returned. Thus, for any data source
of length greater than zero, repeated invocations of this `read()` operation
will eventually read all the data.

#### Implementation Notes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. is there a way to add these specification statements in the python statements? as that's designed to be what people write tests off.
  2. Please you the SHALL/MUST/MAYR than other terms "is expected to" etc. Yes, yours is the better prose, but we want no ambiguity here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, Steve.

  1. I pushed up a change trying to move some of this up. A few of the implementation notes remain to state the unique implementation choices of HDFS.
  2. The RFC verbiage is important. Thanks!


1. If the caller passes a `null` buffer, then an unchecked exception MUST be thrown. The base JDK
`InputStream` implementation throws `NullPointerException`. HDFS historically used
`IllegalArgumentException`. Implementations MAY use either of these.
1. If the caller passes a negative value for `length`, then an unchecked exception MUST be thrown.
The base JDK `InputStream` implementation throws `IndexOutOfBoundsException`. HDFS historically used
`IllegalArgumentException`. Implementations MAY use either of these.
1. Reads through any method MUST return the same data.
1. Callers MAY interleave calls to different read methods (single-byte and multi-byte) on the same
stream. The stream MUST return the same underlying data, regardless of the specific read calls or
their ordering.

### <a name="Seekable.seek"></a>`Seekable.seek(s)`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.compareByteArrays;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readDatasetSingleByteReads;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readNBytes;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
Expand Down Expand Up @@ -408,4 +411,137 @@ public void testFloatingPointLength() throws Throwable {
.isEqualTo(len);
}

@Test
public void testInputStreamReadNullBuffer() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// NullPointerException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"NullPointerException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(null, 0, 10))
.isInstanceOfAny(IllegalArgumentException.class, NullPointerException.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not seen this before, interesting.
I wonder if we could extend intercept() to take a list of classes.

I prefer intercept because it does two things assertj doesn't

  • returns the assert for future analysis
  • if there is no exception raised, returns the result of any lambda-expression which doesn't return void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting idea. I just tried going down the path of writing an interceptAny accepting a list of exception classes. The problem we run into though is that we want to parameterize on the exception type E and return the specific type. If the list can contain any exception type, then we can't put a meaningful bound on the returned E. It seems all we can do is devolve back to Throwable.

I also don't currently have the use case (yet) of chaining additional analysis on that returned exception.

I think I'll hold off on this.

}
}

@Test
public void testInputStreamReadNegativePosition() throws Throwable {
describe("Attempting to read into a negative position should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], -1, 10))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadNegativeLength() throws Throwable {
// The JDK base InputStream (and by extension LocalFSFileInputStream) throws
// IndexOutOfBoundsException. Historically, DFSInputStream has thrown IllegalArgumentException
// instead. Allow either behavior.
describe("Attempting to read into a null buffer should throw IllegalArgumentException or " +
"IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, -1))
.isInstanceOfAny(IllegalArgumentException.class, IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadTooLong() throws Throwable {
describe("Attempting a read longer than the buffer should throw IndexOutOfBoundsException");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThatThrownBy(() -> is.read(new byte[10], 0, 11))
.isInstanceOf(IndexOutOfBoundsException.class);
}
}

@Test
public void testInputStreamReadZeroLengthRead() throws Throwable {
describe("Reading 0 bytes is a no-op");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
Assertions.assertThat(is.read(new byte[10], 0, 0)).describedAs("bytes read").isEqualTo(0);
}
}

@Test
public void testInputStreamConsistentEOF() throws Throwable {
describe("Both single-byte and multi-byte read should report EOF after consuming stream");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
try (FSDataInputStream is = fs.openFile(path).build().get()) {
IOUtils.skipFully(is, len);
Assertions.assertThat(is.read()).describedAs("single byte EOF").isEqualTo(-1);
Assertions.assertThat(is.read(new byte[10], 0, 10)).describedAs("multi byte EOF")
.isEqualTo(-1);
}
}

@Test
public void testInputStreamSingleAndMultiByteReadsEqual() throws Throwable {
describe("Single-byte and multi-byte read should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] multiByteReads = readDataset(fs, path, len);
byte[] singleByteReads = readDatasetSingleByteReads(fs, path, len);
compareByteArrays(multiByteReads, singleByteReads, len);
}

@Test
public void testInputStreamMixedSingleAndMultiByteReadsEqual() throws Throwable {
describe("Mixed single and multi-byte reads on the same stream should return the same bytes");
Path path = methodPath();
FileSystem fs = getFileSystem();
int len = 4096;
createFile(fs, path, true,
dataset(len, 0x40, 0x80));
byte[] expected = readDataset(fs, path, len);
byte[] actual = new byte[len];
int readSize = 128;
try (FSDataInputStream is = fs.openFile(path).build().get()) {
for (int offset = 0; offset < len; offset = offset + readSize + readSize) {
if (readNBytes(is, actual, offset, readSize) != readSize) {
fail("End of file reached before reading fully.");
}
for (int i = 0; i < readSize; ++i) {
int nextByte = is.read();
if (-1 == nextByte) {
fail("End of file reached before reading fully.");
}
actual[offset + readSize + i] = (byte)nextByte;
}
}
}
compareByteArrays(expected, actual, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ public static void writeDataset(FileSystem fs, Path path, byte[] src,
* Read the file and convert to a byte dataset.
* This implements readfully internally, so that it will read
* in the file without ever having to seek()
* The read is performed through a sequence of calls to multi-byte
* {@link InputStream#read(byte[], int, int)}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
Expand All @@ -242,6 +244,57 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
return dest;
}

/**
* Read a file completely and return the contents in a byte buffer. The read is performed through
* repeated calls to single-byte {@link InputStream#read()}.
* @param fs filesystem
* @param path path to read from
* @param len length of data to read
* @return the bytes
* @throws IOException IO problems
*/
public static byte[] readDatasetSingleByteReads(FileSystem fs, Path path, int len)
throws IOException {
byte[] dest = new byte[len];
try (FSDataInputStream in = fs.open(path)) {
for (int i = 0; i < len; ++i) {
int nextByte = in.read();
if (-1 == nextByte) {
throw new EOFException("End of file reached before reading fully.");
}
dest[i] = (byte)nextByte;
}
}
return dest;
}

/**
* Reads {@code len} bytes into offset {@code off} of target buffer {@code b}, up to EOF,
* potentially blocking until enough bytes are available on the stream. This is similar to
* {@code InputStream#readNBytes(int)} introduced in Java 11, except the caller owns allocation of
* the target buffer.
* @param is input stream to read
* @param b target buffer
* @param off offset within {@code b}
* @param len length of data to read
* @return number of bytes read
* @throws IOException IO problems
*/
public static int readNBytes(InputStream is, byte[] b, int off, int len)
throws IOException {
int totalBytes = 0;
while (len > 0) {
int nbytes = is.read(b, off, len);
if (nbytes < 0) {
break;
}
totalBytes += nbytes;
off += nbytes;
len -= nbytes;
}
return totalBytes;
}

/**
* Read a file, verify its length and contents match the expected array.
* @param fs filesystem
Expand Down