Skip to content

Commit 5d3d46f

Browse files
Issue178 (#179)
* fixes #178 remove open tracing as it is replaced by open telemetry * add precommit config * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 6909c61 commit 5d3d46f

33 files changed

+96
-130
lines changed

.pre-commit-config.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
repos:
2+
- repo: https://github.com/pre-commit/pre-commit-hooks
3+
rev: v4.5.0
4+
hooks:
5+
- id: trailing-whitespace
6+
- id: end-of-file-fixer
7+
- id: check-yaml
8+
- id: check-added-large-files
9+
- repo: https://github.com/networknt/pre-commit-hook-keyword
10+
rev: f17c4de14fc24420f6768c19cad06ba03af06d86
11+
hooks:
12+
- id: keywordscan
13+
args: ["--keywords=c3VubGlmZQ==,Y2liYw==,c3VuIGxpZmU="]
14+
types: ["text"]

README.md

+16-17
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ A fast and light-weight reverse proxy with embedded gateway to wrap third party
1212

1313
## Why Reverse Proxy
1414

15-
All the services developed on top of light-4j frameworks support [client side service discovery](http://microservices.io/patterns/client-side-discovery.html),
15+
All the services developed on top of light-4j frameworks support [client side service discovery](http://microservices.io/patterns/client-side-discovery.html),
1616
load balance and cluster natively. So there is no need to put a reverse proxy instance in front of our
1717
services like other API frameworks that support only [server side service discovery](http://microservices.io/patterns/server-side-discovery.html).
1818

19-
Also, light services embed a distributed gateway to address all the cross-cutting concerns in the
19+
Also, light services embed a distributed gateway to address all the cross-cutting concerns in the
2020
request/response chain and work with the ecosystem that consists:
2121

2222
* [light-oauth2](https://doc.networknt.com/service/oauth/) for security
@@ -31,14 +31,14 @@ request/response chain and work with the ecosystem that consists:
3131
Currently, we only support Java language; however, we are planning to support Nodejs and Go in the future
3232
if there are enough customer demands. For some of our customers, they have some existing RESTful APIs that
3333
built on top of other Java frameworks or other languages. We've been asked frequently on how to interact
34-
with these services to/from light services and how to enable security, metrics, logging, discovery,
35-
validation, sanitization etc. on the existing services.
34+
with these services to/from light services and how to enable security, metrics, logging, discovery,
35+
validation, sanitization etc. on the existing services.
3636

37-
Our answer is to deploy a reverse proxy built on top of light-4j framework that wraps the existing service.
37+
Our answer is to deploy a reverse proxy built on top of light-4j framework that wraps the existing service.
3838

3939
The reverse proxy has the following features:
4040

41-
* High throughput, low latency and small footprint.
41+
* High throughput, low latency and small footprint.
4242
* Integrate light-oauth2 to protect un-secured services
4343
* Built-in load balancer
4444
* Can be started with Docker or standalone
@@ -58,7 +58,7 @@ Click [here](https://doc.networknt.com/tutorial/kafka-sidecar/local-dev/#reactiv
5858

5959
If the evnironment is windows OS, we can start confluent kafka docker-compose for testing:
6060

61-
From command line, create a docker network first which will indicate kafka and sidecar running in same network.
61+
From command line, create a docker network first which will indicate kafka and sidecar running in same network.
6262

6363
```
6464
cd kafka-sidecar
@@ -159,7 +159,7 @@ value schema:
159159
There are two options for starting kafka sidecar and backend api:
160160

161161
- by standalone APIs
162-
162+
163163
- by docker-compose
164164
----
165165
#### By docker-compose
@@ -255,12 +255,12 @@ The Reactive Consumer will send those message to DLQ if deadLetterEnabled.
255255

256256
### Using the sidecar endpoint to producer error message to DLQ:
257257

258-
- endpoint:
258+
- endpoint:
259259

260260
/consumers/deadLetter/active
261261

262262
- method
263-
263+
264264
POST
265265

266266
Sample request payload:
@@ -305,19 +305,19 @@ Sample request payload:
305305

306306
### Active Consumer Workflow:
307307

308-
Kafka sidecar provide the end-to-end workflow for actively consumer the records from kafka topic(s).
308+
Kafka sidecar provide the end-to-end workflow for actively consumer the records from kafka topic(s).
309309

310310
When user try following workflow to consumer records:
311311

312312
- enable ActiveConsumerStartupHook on service.yml (or values.yml)
313-
313+
314314
This will create kafka consumer manager for active consumer on kafka sidecar server startup
315315

316316

317317
- create consumer group
318-
318+
319319
endpoint: /consumers/{group}
320-
320+
321321
method: POST
322322

323323

@@ -342,16 +342,15 @@ When user try following workflow to consumer records:
342342
method: POST
343343

344344

345-
- add the records process detail to audit
345+
- add the records process detail to audit
346346

347347
endpoint: /consumers/active/audit
348348

349349
method: POST
350350

351-
### To learn how to use this proxy, pleases refer to
351+
### To learn how to use this proxy, pleases refer to
352352

353353
* [Getting Started](https://doc.networknt.com/getting-started/light-proxy/) to learn core concepts
354354
* [Tutorial](https://doc.networknt.com/tutorial/proxy/) with step by step guide for RESTful proxy
355355
* [Configuration](https://doc.networknt.com/service/proxy/configuration/) for different configurations based on your situations
356356
* [Artifact](https://doc.networknt.com/service/proxy/artifact/) to guide customer to choose the right artifact to deploy light-proxy.
357-

doc/ksql.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -205,14 +205,14 @@ Sample request body:
205205
"key": "2",
206206
"value": {
207207
"userId": "2222",
208-
"firstName": "test2"
208+
"firstName": "test2"
209209
}
210210
},
211211
{
212212
"key": "3",
213213
"value": {
214214
"userId": "3333",
215-
"firstName": "test3"
215+
"firstName": "test3"
216216
}
217217
}
218218
]
@@ -222,8 +222,8 @@ Sample request body:
222222
- Create KTable based on the topic created above:
223223

224224
```json
225-
CREATE TABLE USERS
226-
(ID STRING PRIMARY KEY, USERID STRING, FIRSTNAME STRING, LASTNAME STRING, COUNTRY STRING)
225+
CREATE TABLE USERS
226+
(ID STRING PRIMARY KEY, USERID STRING, FIRSTNAME STRING, LASTNAME STRING, COUNTRY STRING)
227227
WITH (KAFKA_TOPIC='test', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON_SR');
228228
```
229229

@@ -240,7 +240,7 @@ CREATE TABLE USERS
240240
```text
241241
curl --location --request POST 'http://localhost:8442/ksqldb/active' \
242242
--header 'Content-Type: application/json' \
243-
--data-raw '
243+
--data-raw '
244244
{
245245
"offset": "earliest",
246246
"deserializationError": false,
@@ -261,4 +261,4 @@ Response:
261261
"ID": "1"
262262
}
263263
]
264-
```
264+
```

docker-compose-demo.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ services:
2828
networks:
2929
localnet:
3030
# driver: bridge
31-
external: true
31+
external: true

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,4 @@ services:
198198
networks:
199199
localnet:
200200
# driver: bridge
201-
external: true
201+
external: true

src/main/java/com/networknt/mesh/kafka/AuditProducerShutdownHook.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ public void onShutdown() {
1919
}
2020
logger.info("AuditProducerShutdownHook ends");
2121
}
22-
22+
2323
}

src/main/java/com/networknt/mesh/kafka/AuditProducerStartupHook.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ public void onStartup() {
2020
auditProducer = lightProducer.getProducer();
2121
logger.info("AuditProducerStartupHook ends");
2222
}
23-
23+
2424
}

src/main/java/com/networknt/mesh/kafka/handler/ConsumersGroupPostHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public ConsumersGroupPostHandler () {
2929
if(logger.isDebugEnabled()) logger.debug("ConsumersGroupPostHandler constructed!");
3030
}
3131

32-
32+
3333
@Override
3434
public void handleRequest(HttpServerExchange exchange) throws Exception {
3535
String group = exchange.getPathParameters().get("group").getFirst();

src/main/java/com/networknt/mesh/kafka/handler/DeadlettersQueueActiveGetHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public DeadlettersQueueActiveGetHandler() {
3737
if(logger.isDebugEnabled()) logger.debug("ReplayDeadLetterTopicGetHandler constructed!");
3838
}
3939

40-
40+
4141
@Override
4242
public void handleRequest(HttpServerExchange exchange) throws Exception {
4343
String groupId = exchange.getQueryParameters().get("group")==null? config.getGroupId() : exchange.getQueryParameters().get("group").getFirst();

src/main/java/com/networknt/mesh/kafka/handler/ProducersTopicPostHandler.java

+2-28
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,27 @@
11
package com.networknt.mesh.kafka.handler;
22

3-
import com.fasterxml.jackson.databind.node.NullNode;
4-
import com.google.protobuf.ByteString;
53
import com.networknt.body.BodyHandler;
64
import com.networknt.config.Config;
75
import com.networknt.config.JsonMapper;
8-
import com.networknt.exception.FrameworkException;
96
import com.networknt.handler.LightHttpHandler;
10-
import com.networknt.httpstring.AttachmentConstants;
11-
import com.networknt.httpstring.HttpStringConstants;
127
import com.networknt.kafka.common.KafkaProducerConfig;
138
import com.networknt.kafka.entity.*;
149
import com.networknt.kafka.producer.*;
1510
import com.networknt.mesh.kafka.ProducerStartupHook;
1611
import com.networknt.mesh.kafka.WriteAuditLog;
17-
import com.networknt.server.Server;
1812
import com.networknt.server.ServerConfig;
1913
import com.networknt.service.SingletonServiceFactory;
20-
import com.networknt.status.Status;
2114
import com.networknt.utility.Constants;
2215
import com.networknt.kafka.entity.EmbeddedFormat;
23-
import com.networknt.utility.Util;
24-
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
25-
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
26-
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
27-
import io.confluent.kafka.schemaregistry.client.rest.RestService;
28-
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
29-
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
30-
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
31-
import io.opentracing.Tracer;
32-
import io.opentracing.propagation.Format;
33-
import io.opentracing.tag.Tags;
3416
import io.undertow.server.HttpServerExchange;
35-
import org.apache.kafka.clients.producer.ProducerRecord;
36-
import org.apache.kafka.common.KafkaException;
37-
import org.apache.kafka.common.errors.AuthenticationException;
38-
import org.apache.kafka.common.errors.AuthorizationException;
39-
import org.apache.kafka.common.errors.RetriableException;
4017
import org.apache.kafka.common.header.Headers;
4118
import org.apache.kafka.common.header.internals.RecordHeaders;
4219
import org.slf4j.Logger;
4320
import org.slf4j.LoggerFactory;
4421

4522
import java.nio.charset.StandardCharsets;
46-
import java.time.Instant;
4723
import java.util.*;
4824
import java.util.concurrent.CompletableFuture;
49-
import java.util.stream.Collectors;
50-
51-
import static java.util.Collections.singletonList;
5225

5326
/**
5427
* The producer endpoint that can receive request from the backend service messages and push them
@@ -139,6 +112,7 @@ public Headers populateHeaders(HttpServerExchange exchange, KafkaProducerConfig
139112
if(token != null) {
140113
headers.add(Constants.AUTHORIZATION_STRING, token.getBytes(StandardCharsets.UTF_8));
141114
}
115+
/*
142116
if(config.isInjectOpenTracing()) {
143117
// maybe we should move this to the ProduceRecord in the future like the correlationId and traceabilityId.
144118
Tracer tracer = exchange.getAttachment(AttachmentConstants.EXCHANGE_TRACER);
@@ -148,7 +122,7 @@ public Headers populateHeaders(HttpServerExchange exchange, KafkaProducerConfig
148122
tracer.inject(tracer.activeSpan().context(), Format.Builtin.TEXT_MAP, new KafkaHeadersCarrier(headers));
149123
}
150124
}
151-
125+
*/
152126
// remove the correlationId from the HTTP header and moved it to the ProduceRecord as it is per record attribute.
153127
// remove the traceabilityId from the HTTP header and moved it to the ProduceRecord as it is per record attribute.
154128

src/main/java/com/networknt/mesh/kafka/handler/TopicReplayPostHandler.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,12 @@
1111
import com.networknt.kafka.common.KafkaProducerConfig;
1212
import com.networknt.kafka.common.KafkaStreamsConfig;
1313
import com.networknt.kafka.entity.*;
14-
import com.networknt.kafka.producer.KafkaHeadersCarrier;
1514
import com.networknt.kafka.producer.NativeLightProducer;
1615
import com.networknt.kafka.producer.SidecarProducer;
17-
import com.networknt.server.Server;
1816
import com.networknt.server.ServerConfig;
1917
import com.networknt.service.SingletonServiceFactory;
2018
import com.networknt.utility.Constants;
2119
import com.networknt.utility.StringUtils;
22-
import io.opentracing.Tracer;
23-
import io.opentracing.propagation.Format;
24-
import io.opentracing.tag.Tags;
2520
import io.undertow.server.HttpServerExchange;
2621
import org.apache.kafka.common.header.Headers;
2722
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -175,6 +170,7 @@ public Headers populateHeaders(HttpServerExchange exchange, KafkaProducerConfig
175170
if(token != null) {
176171
headers.add(Constants.AUTHORIZATION_STRING, token.getBytes(StandardCharsets.UTF_8));
177172
}
173+
/*
178174
if(config.isInjectOpenTracing()) {
179175
// maybe we should move this to the ProduceRecord in the future like the correlationId and traceabilityId.
180176
Tracer tracer = exchange.getAttachment(AttachmentConstants.EXCHANGE_TRACER);
@@ -184,7 +180,7 @@ public Headers populateHeaders(HttpServerExchange exchange, KafkaProducerConfig
184180
tracer.inject(tracer.activeSpan().context(), Format.Builtin.TEXT_MAP, new KafkaHeadersCarrier(headers));
185181
}
186182
}
187-
183+
*/
188184
// remove the correlationId from the HTTP header and moved it to the ProduceRecord as it is per record attribute.
189185
// remove the traceabilityId from the HTTP header and moved it to the ProduceRecord as it is per record attribute.
190186

src/main/java/com/networknt/mesh/kafka/util/ConvertToList.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ public static List<Map<String, Object>> string2List(ObjectMapper objectMapper,St
1818
throw new RuntimeException(e);
1919
}
2020
}
21-
}
21+
}

src/main/java/com/networknt/mesh/kafka/util/CreateConsumerGroup.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ public String createConsumer() {
4949
}
5050

5151
/**
52-
*
52+
*
5353
* @return instance Id Utility
5454
*/
5555
public String getInstanceId(){
5656
return StringUtils.isEmpty(instanceId) ? null : instanceId;
5757
}
5858

5959
/**
60-
*
60+
*
6161
* @param instanceId A string instance Id
6262
*/
6363
public void setInstanceId(String instanceId){

src/main/java/com/networknt/mesh/kafka/util/KafkaConsumerManagerFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ private KafkaConsumerManagerFactory(){}
1010
public static KafkaConsumerManager createKafkaConsumerManager(KafkaConsumerConfig config) {
1111
return new KafkaConsumerManager(config);
1212
}
13-
}
13+
}

src/main/java/com/networknt/mesh/kafka/util/StreamsFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ public static SubscribeTopic createSubscribeTopic(String group) {
3232
}
3333

3434
public static KafkaStreams createKafkaStreams(Topology topology, Properties props) { return new KafkaStreams(topology, props); }
35-
}
35+
}

src/main/resources/config/kafka-consumer.yml

-1
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,3 @@ iteratorBackoffMs: ${kafka-consumer.iteratorBackoffMs:50}
8989
#keep alive time out occurs and sidecar consumer does not move forward. Hence we are adding this property so that we can explicitly close the connection
9090
#when we receive the response and not wait for FinAck.
9191
backendConnectionReset: ${kafka-consumer.backendConnectionReset:false}
92-

src/test/java/com/networknt/mesh/kafka/handler/ConsumersGroupInstancesInstanceAssignmentsGetHandlerTest.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ public void testConsumersGroupInstancesInstanceAssignmentsGetHandlerTest() throw
6363
ClientConnection connection = (ClientConnection) connectionToken.getRawConnection();
6464

6565
ClientRequest request = new ClientRequest().setPath(requestUri).setMethod(Methods.GET);
66-
67-
//customized header parameters
66+
67+
//customized header parameters
6868
request.getRequestHeaders().put(new HttpString("host"), "localhost");
6969
connection.sendRequest(request, client.createClientCallback(reference, latch));
70-
70+
7171
latch.await();
7272
} catch (Exception e) {
7373
logger.error("Exception: ", e);
@@ -89,4 +89,3 @@ public void testConsumersGroupInstancesInstanceAssignmentsGetHandlerTest() throw
8989
Assert.assertNull(status);
9090
}
9191
}
92-

0 commit comments

Comments
 (0)