Skip to content

Commit 7390e5f

Browse files
authored
Merge pull request #37 from digipost/inputstreamIterator
Add InputStreamIterator
2 parents 65db4df + 524aa49 commit 7390e5f

File tree

2 files changed

+292
-0
lines changed

2 files changed

+292
-0
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (C) Posten Norge AS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package no.digipost.io;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.UncheckedIOException;
21+
import java.util.Iterator;
22+
import java.util.NoSuchElementException;
23+
24+
import static java.lang.Math.toIntExact;
25+
import static no.digipost.DiggBase.friendlyName;
26+
import static no.digipost.DiggExceptions.exceptionNameAndMessage;
27+
28+
/**
29+
* InputStreamIterator is an {@link Iterator} reading from an {@link InputStream} in chunks
30+
* where each chunk is returned as the next element in the iterable.
31+
* When the input stream is fully consumed the iterator has no more elements.
32+
*/
33+
public class InputStreamIterator implements Iterator<byte[]> {
34+
private final InputStream inputStream;
35+
private final int chunkSizeBytes;
36+
private byte[] next;
37+
private Boolean hasNext;
38+
39+
/**
40+
* @param inputStream The input stream to iterate over
41+
* @param chunkSize DataSize should not be too big since that defeats the purpose of this iterator.
42+
*/
43+
public InputStreamIterator(InputStream inputStream, DataSize chunkSize) {
44+
this(inputStream, toIntExact(chunkSize.toBytes()));
45+
}
46+
47+
public InputStreamIterator(InputStream inputStream, int chunkSizeBytes) {
48+
this.inputStream = inputStream;
49+
this.chunkSizeBytes = chunkSizeBytes;
50+
}
51+
52+
private byte[] loadNextChunk() {
53+
byte[] chunk = new byte[chunkSizeBytes];
54+
int bytesRead = 0;
55+
try {
56+
bytesRead = inputStream.read(chunk);
57+
if (bytesRead == -1) {
58+
return null;
59+
}
60+
} catch (IOException e) {
61+
throw new UncheckedIOException(
62+
"Failed reading next chunk of up to " + chunkSizeBytes +
63+
" bytes from " + friendlyName(inputStream.getClass()) +
64+
" because " + exceptionNameAndMessage(e), e);
65+
}
66+
67+
if (bytesRead < chunkSizeBytes) {
68+
// resize the buffer if less data was read
69+
byte[] smallerBuffer = new byte[bytesRead];
70+
System.arraycopy(chunk, 0, smallerBuffer, 0, bytesRead);
71+
chunk = smallerBuffer;
72+
}
73+
74+
return chunk;
75+
}
76+
77+
/**
78+
*
79+
* @return true if the iteration has more elements
80+
*
81+
* @throws UncheckedIOException if the wrapped InputStream throws an IOException
82+
*/
83+
@Override
84+
public boolean hasNext() {
85+
if (hasNext == null) {
86+
next = loadNextChunk();
87+
hasNext = (next != null);
88+
}
89+
90+
return hasNext;
91+
}
92+
93+
@Override
94+
public byte[] next() {
95+
if (!hasNext()) {
96+
throw new NoSuchElementException("No more data to read");
97+
}
98+
99+
byte[] result = next;
100+
hasNext = null;
101+
next = null;
102+
return result;
103+
}
104+
105+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright (C) Posten Norge AS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package no.digipost.io;
17+
18+
import org.apache.commons.io.IOUtils;
19+
import org.apache.commons.io.input.BrokenInputStream;
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.io.UncheckedIOException;
27+
import java.nio.charset.Charset;
28+
import java.util.ArrayList;
29+
import java.util.Iterator;
30+
import java.util.List;
31+
import java.util.NoSuchElementException;
32+
import java.util.Objects;
33+
import java.util.zip.ZipEntry;
34+
import java.util.zip.ZipInputStream;
35+
import java.util.zip.ZipOutputStream;
36+
37+
import static java.nio.charset.StandardCharsets.UTF_8;
38+
import static no.digipost.DiggExceptions.runUnchecked;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
import static org.hamcrest.Matchers.containsInAnyOrder;
41+
import static org.hamcrest.Matchers.containsStringIgnoringCase;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.hamcrest.core.StringContains.containsString;
44+
import static org.junit.jupiter.api.Assertions.assertThrows;
45+
import static uk.co.probablyfine.matchers.Java8Matchers.where;
46+
import static uk.co.probablyfine.matchers.Java8Matchers.whereNot;
47+
48+
class InputStreamIteratorTest {
49+
50+
@Test
51+
void fully_reads_the_input_stream() throws Exception {
52+
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8));) {
53+
String consumedFromIterator = consumeToString(new InputStreamIterator(inputStream, DataSize.bytes(2)), UTF_8);
54+
55+
assertThat(consumedFromIterator, is("Some data"));
56+
}
57+
}
58+
59+
@Test
60+
void cannot_instantiate_with_too_big_chunk_size() throws Exception {
61+
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8))) {
62+
Exception thrown = assertThrows(ArithmeticException.class, () -> new InputStreamIterator(inputStream, DataSize.MAX));
63+
64+
assertThat(thrown, where(Exception::getMessage, containsStringIgnoringCase("integer overflow")));
65+
}
66+
}
67+
68+
@Test
69+
void throws_if_next_is_called_with_no_more_elements() throws Exception {
70+
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("Some data".getBytes(UTF_8));) {
71+
InputStreamIterator iterator = new InputStreamIterator(inputStream, 2);
72+
73+
assertThat(consumeToString(iterator, UTF_8), is("Some data"));
74+
assertThat(iterator, whereNot(Iterator::hasNext));
75+
76+
assertThrows(NoSuchElementException.class, iterator::next);
77+
}
78+
}
79+
80+
@Test
81+
void throws_exception_if_input_stream_fails() throws Exception {
82+
InputStreamIterator iterator = new InputStreamIterator(new BrokenInputStream(), 3);
83+
84+
Exception ex = assertThrows(UncheckedIOException.class, iterator::next);
85+
86+
assertThat(ex, where(Exception::getMessage, containsString("BrokenInputStream")));
87+
}
88+
89+
@Test
90+
void worksWithInputStreamHavingMultipleEntries() throws IOException {
91+
ZipEntryContent file1 = new ZipEntryContent("file1.txt", "This is file1");
92+
ZipEntryContent file2 = new ZipEntryContent("file2.txt", "This is file2");
93+
byte[] zipFile = zip(file1, file2);
94+
95+
96+
List<ZipEntryContent> entriesReadConventionally = new ArrayList<>();
97+
try (ZipInputStream zipReader = new ZipInputStream(new ByteArrayInputStream(zipFile))) {
98+
for (ZipEntry nextEntry = zipReader.getNextEntry(); nextEntry != null; nextEntry = zipReader.getNextEntry()) {
99+
entriesReadConventionally.add(ZipEntryContent.read(nextEntry, zipReader));
100+
}
101+
}
102+
103+
assertThat(entriesReadConventionally, containsInAnyOrder(file1, file2));
104+
105+
106+
List<ZipEntryContent> entriesReadInChunks = new ArrayList<>();
107+
try (ZipInputStream zipReader = new ZipInputStream(new ByteArrayInputStream(zipFile))) {
108+
for (ZipEntry nextEntry = zipReader.getNextEntry(); nextEntry != null; nextEntry = zipReader.getNextEntry()) {
109+
String content = consumeToString(new InputStreamIterator(zipReader, DataSize.bytes(2)), UTF_8);
110+
entriesReadInChunks.add(new ZipEntryContent(nextEntry, content));
111+
}
112+
}
113+
114+
assertThat(entriesReadInChunks, containsInAnyOrder(file1, file2));
115+
}
116+
117+
private static String consumeToString(InputStreamIterator iterator, Charset charset) {
118+
byte[] bytes = consumeAndFlatten(iterator);
119+
return new String(bytes, charset);
120+
}
121+
122+
private static byte[] consumeAndFlatten(InputStreamIterator iterator) {
123+
ByteArrayOutputStream chunkConsumer = new ByteArrayOutputStream();
124+
for (byte[] chunk : (Iterable<byte[]>) () -> iterator) {
125+
runUnchecked(() -> chunkConsumer.write(chunk));
126+
}
127+
return chunkConsumer.toByteArray();
128+
}
129+
130+
131+
private static final class ZipEntryContent {
132+
133+
static ZipEntryContent read(ZipEntry entry, InputStream contentStream) throws IOException {
134+
return new ZipEntryContent(entry.getName(), IOUtils.toString(contentStream, UTF_8));
135+
}
136+
137+
final String name;
138+
final String content;
139+
140+
ZipEntryContent(ZipEntry entry, String content) {
141+
this(entry.getName(), content);
142+
}
143+
144+
ZipEntryContent(String name, String content) {
145+
this.name = name;
146+
this.content = content;
147+
}
148+
149+
public void writeTo(ZipOutputStream zip) throws IOException {
150+
zip.putNextEntry(new ZipEntry(name));
151+
zip.write(content.getBytes(UTF_8));
152+
}
153+
154+
@Override
155+
public String toString() {
156+
return "zip entry '" + name + "': " + content;
157+
}
158+
159+
@Override
160+
public boolean equals(Object o) {
161+
if (o instanceof ZipEntryContent) {
162+
ZipEntryContent that = (ZipEntryContent) o;
163+
return Objects.equals(this.name, that.name) && Objects.equals(this.content, that.content);
164+
}
165+
return false;
166+
}
167+
168+
@Override
169+
public int hashCode() {
170+
return Objects.hash(name, content);
171+
}
172+
173+
}
174+
175+
private static byte[] zip(ZipEntryContent ... entries) {
176+
ByteArrayOutputStream zipOutput = new ByteArrayOutputStream();
177+
try (ZipOutputStream zipWriter = new ZipOutputStream(zipOutput)) {
178+
for (ZipEntryContent entry : entries) {
179+
entry.writeTo(zipWriter);
180+
}
181+
} catch (IOException e) {
182+
throw new UncheckedIOException(e);
183+
}
184+
return zipOutput.toByteArray();
185+
}
186+
187+
}

0 commit comments

Comments
 (0)