Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming bodies 3 #2754

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
70 changes: 35 additions & 35 deletions core/src/main/java/feign/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,15 @@ HttpURLConnection convertAndSend(Request request, Options options) throws IOExce
boolean deflateEncodedRequest = this.isDeflate(contentEncodingValues);

boolean hasAcceptHeader = false;
Integer contentLength = null;
Long contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
contentLength = Long.valueOf(value);
connection.addRequestProperty(field, value);
}
}
Expand All @@ -201,47 +201,47 @@ else if (field.equals(ACCEPT_ENCODING)) {
}
}
}

// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}

byte[] body = request.body();

if (body != null) {
/*
* Ignore disableRequestBuffering flag if the empty body was set, to ensure that internal
* retry logic applies to such requests.
*/
if (disableRequestBuffering) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(body);
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
if (request.hasBody()) {
/*
* Ignore disableRequestBuffering flag if the empty body was set, to ensure that internal
* retry logic applies to such requests.
*/
if (disableRequestBuffering) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
request.sendBodyToOutputStream(out);
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
} else {
if (request.httpMethod().isWithBody()) {
// To use this Header, set 'sun.net.http.allowRestrictedHeaders' property true.
connection.addRequestProperty("Content-Length", "0");
}
}
}

if (body == null && request.httpMethod().isWithBody()) {
// To use this Header, set 'sun.net.http.allowRestrictedHeaders' property true.
connection.addRequestProperty("Content-Length", "0");
}


return connection;
}

Expand Down
37 changes: 10 additions & 27 deletions core/src/main/java/feign/FeignException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package feign;

import static feign.Util.*;
import static feign.Util.UTF_8;
import static feign.Util.caseInsensitiveCopyOf;
import static feign.Util.checkNotNull;
import static java.lang.String.format;
import static java.util.regex.Pattern.CASE_INSENSITIVE;

import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -27,15 +28,16 @@
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import feign.utils.ContentTypeParser;

/** Origin exception type for all Http Apis. */
// TODO: KD - FeignException does not currently support streaming bodies. Usually, error responses are short enough that this isn't an issue, but we may want to eventually replace byte[] responseBody with Response.Body responseBody;
// TODO: KD - for that matter, why aren't we just capturing the response itself instead of the headers and body as separate parameters? errorReading() captures the response...
public class FeignException extends RuntimeException {

private static final String EXCEPTION_MESSAGE_TEMPLATE_NULL_REQUEST =
Expand Down Expand Up @@ -500,9 +502,7 @@ public String build() {

private String getBodyAsString(byte[] body, Map<String, Collection<String>> headers) {
Charset charset = getResponseCharset(headers);
if (charset == null) {
charset = Util.UTF_8;
}

return getResponseBody(body, charset);
}

Expand All @@ -529,26 +529,9 @@ private String getResponseBodyPreview(byte[] body, Charset charset) {

private static Charset getResponseCharset(Map<String, Collection<String>> headers) {

Collection<String> strings = headers.get("content-type");
if (strings == null || strings.isEmpty()) {
return null;
}

Pattern pattern = Pattern.compile(".*charset=\"?([^\\s|^;|^\"]+).*", CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(strings.iterator().next());
if (!matcher.lookingAt()) {
return null;
}
return ContentTypeParser.parseContentTypeFromHeaders(headers, "").getCharset().orElse(Util.UTF_8);

String group = matcher.group(1);
try {
if (!Charset.isSupported(group)) {
return null;
}
} catch (IllegalCharsetNameException ex) {
return null;
}
return Charset.forName(group);
}

}
}
36 changes: 14 additions & 22 deletions core/src/main/java/feign/InvocationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
import static feign.FeignException.errorReading;
import static feign.Util.ensureClosed;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Type;

import feign.codec.DecodeException;
import feign.codec.Decoder;
import feign.codec.ErrorDecoder;
import java.io.IOException;
import java.lang.reflect.Type;

public class InvocationContext {
private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L;
private final String configKey;
private final Decoder decoder;
private final ErrorDecoder errorDecoder;
Expand Down Expand Up @@ -68,9 +69,11 @@ public Response response() {

public Object proceed() throws Exception {
if (returnType == Response.class) {
return disconnectResponseBodyIfNeeded(response);
return response;
}

boolean noClose = false;

try {
final boolean shouldDecodeResponseBody =
(response.status() >= 200 && response.status() < 300)
Expand All @@ -86,36 +89,25 @@ public Object proceed() throws Exception {
}

Class<?> rawType = Types.getRawType(returnType);

// if the return type is closable, then it is the callers responsibility to close.
if (Closeable.class.isAssignableFrom(rawType)) {
noClose = true;
}

if (TypedResponse.class.isAssignableFrom(rawType)) {
Type bodyType = Types.resolveLastTypeParameter(returnType, TypedResponse.class);
return TypedResponse.builder(response).body(decode(response, bodyType)).build();
}

return decode(response, returnType);
} finally {
if (closeAfterDecode) {
if (closeAfterDecode && !noClose) {
ensureClosed(response.body());
}
}
}

private static Response disconnectResponseBodyIfNeeded(Response response) throws IOException {
final boolean shouldDisconnectResponseBody =
response.body() != null
&& response.body().length() != null
&& response.body().length() <= MAX_RESPONSE_BUFFER_SIZE;
if (!shouldDisconnectResponseBody) {
return response;
}

try {
final byte[] bodyData = Util.toByteArray(response.body().asInputStream());
return response.toBuilder().body(bodyData).build();
} finally {
ensureClosed(response.body());
}
}

private Object decode(Response response, Type returnType) {
try {
return decoder.decode(response, returnType);
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/java/feign/Logger.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,15 @@ protected void logRequest(String configKey, Level logLevel, Request request) {
}
}

int bodyLength = 0;
if (request.body() != null) {
bodyLength = request.length();
if (request.hasBody()) {
if (logLevel.ordinal() >= Level.FULL.ordinal()) {
String bodyText =
request.charset() != null ? new String(request.body(), request.charset()) : null;
log(configKey, ""); // CRLF
log(configKey, "%s", bodyText != null ? bodyText : "Binary data");
log(configKey, request.bodyAsString());
}
log(configKey, "---> END HTTP");
} else {
log(configKey, "---> END HTTP (no body)");
}
log(configKey, "---> END HTTP (%s-byte body)", bodyLength);
}
}

Expand Down
Loading