Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- Added support for callbacks to mark requests as failed.
- Added support for OIDC Bearer tokens.

## [0.15.0] - 2024-07-30
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,18 @@ and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL proper
is provided.


- Callback Errors:

Throw a [FailedRequestException](src/main/java/com/getindata/connectors/http/FailedRequestException.java) to indicate a
failed request.

This allows control over the connector's behavior when an HTTP response does not meet your expectations
whether based on the response body or headers.

Currently, the only side effect is to incremenet the [numRecordsSendErrors counter](https://github.com/getindata/flink-http-connector?tab=readme-ov-file#http-sink-2), as the connector does not
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo incremenet

support retries yet. However, once retry functionality is implemented, this will allow users to specify if requests should be retried.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connector does support retries



## HTTP status code handler
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
By default all 400s and 500s response codes will be interpreted as error code.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.getindata.connectors.http;

/**
* Exception thrown from a {@link HttpPostRequestCallback}
* when a request should be considered as failed.
*
* <p>This exception is caught by the
* {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient}
* and {@link com.getindata.connectors.http.internal.table.lookup.JavaNetHttpPollingClient}
*/
public class FailedRequestException extends Exception {
public FailedRequestException(String message) {
super(message);
}

public FailedRequestException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ void call(
RequestT requestEntry,
String endpointUrl,
Map<String, String> headerMap
);
) throws FailedRequestException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;

import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
Expand Down Expand Up @@ -98,13 +99,20 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();

httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
var failedCallback = false;

try {
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
} catch (FailedRequestException e) {
failedCallback = true;
log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
}

// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
statusCodeChecker.isErrorCode(optResponse.get().statusCode()) ||
failedCallback) {
failedResponses.add(sinkRequestEntry);
} else {
successfulResponses.add(sinkRequestEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.StringUtils;

import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
Expand Down Expand Up @@ -89,7 +90,14 @@ private Optional<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
try {
this.httpPostRequestCallback.call(
response, request, "endpoint", Collections.emptyMap()
);
} catch (FailedRequestException e) {
log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
return Optional.empty();
Copy link
Collaborator

@davidradl davidradl Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we not throw an Exception here? There is already precedence as this method already throws throws IOException.

Why is this not log.error - the text says Error?

Copy link
Author

@amstee amstee Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same reason as below, I don't think there's a need to bubble up the exception, just treat this request as failed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code has changed in this area. Prior to this change all failures resulted in the job ending.

It is possible to specify in the table config the list of status codes to count as ignore, retry, failed or successful. If the logic in the callback is around status codes, then we do not need this extra code change. If your callback is doing more sophisticated checking of the response payload, then this would make sense, the exception should be thrown if you want to fail the call, continue-on-error true will determine if the job carries on or not. I do not think we should swallow the Exception as this is not in line with the design of this connector. WDYT?

}

if (response == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.io.File;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -19,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.HttpsConnectionTestBase;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
Expand Down Expand Up @@ -62,6 +67,30 @@ public void testHttpConnection() {
batchRequestSubmitterFactory);
}

@Test
public void testHttpPostRequestCallbackWithFailedRequestException()
throws ExecutionException, InterruptedException {
wireMockServer = new WireMockServer(SERVER_PORT);
wireMockServer.start();
mockEndPoint(wireMockServer);

JavaNetSinkHttpClient client =
new JavaNetSinkHttpClient(
properties,
new TestPostRequestCallbackWithException(),
headerPreprocessor,
perRequestSubmitterFactory);
HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]);
SinkHttpClientResponse response =
client.putRequests(
Collections.singletonList(requestEntry),
"https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT
).get();

assertThat(response.getSuccessfulRequests()).isEmpty();
assertThat(response.getFailedRequests()).isNotEmpty();
}

@Test
public void testHttpsConnectionWithSelfSignedCert() {

Expand Down Expand Up @@ -366,4 +395,17 @@ private void mockEndPointWithBasicAuth(WireMockServer wireMockServer) {
.withBody("{}"))
);
}

public static class TestPostRequestCallbackWithException
implements HttpPostRequestCallback<HttpRequest> {
@Override
public void call(
HttpResponse<String> response,
HttpRequest requestEntry,
String endpointUrl,
Map<String, String> headerMap
) throws FailedRequestException {
throw new FailedRequestException("Test exception");
}
}
}