Skip to content

Commit

Permalink
initial kafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
ukclivecox committed Aug 28, 2017
1 parent 88b2cad commit 87dfccc
Show file tree
Hide file tree
Showing 20 changed files with 534 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ tests/out.log
cluster-manager/settings.sh
engine/target/
zookeeper-k8s/zookeeper.json
kafka/tests/src/prediction_pb2.py
kafka/tests/src/seldonengine_pb2.py
4 changes: 4 additions & 0 deletions api-frontend/apife.json.in
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
"name": "SELDON_ENGINE_ZK_SERVERS",
"value": "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181"
},
{
"name": "SELDON_ENGINE_KAFKA_SERVER",
"value": "kafka:9092"
},
{
"name": "SELDON_CLUSTER_MANAGER_REDIS_HOST",
"value": "redis"
Expand Down
5 changes: 5 additions & 0 deletions api-frontend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@
<version>0.0.23</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@
import org.springframework.web.bind.annotation.RestController;

import com.codahale.metrics.annotation.Timed;
import com.google.protobuf.InvalidProtocolBufferException;

import io.seldon.apife.exception.APIException;
import io.seldon.apife.exception.APIException.ApiExceptionType;
import io.seldon.apife.kafka.KafkaRequestResponseProducer;
import io.seldon.apife.pb.ProtoBufUtils;
import io.seldon.apife.service.PredictionService;
import io.seldon.protos.PredictionProtos.PredictionRequestDef;
import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;
import io.seldon.protos.PredictionProtos.PredictionResponseDef;

@RestController
public class RestClientController {
Expand All @@ -27,6 +35,9 @@ public class RestClientController {
@Autowired
private PredictionService predictionService;

@Autowired
private KafkaRequestResponseProducer kafkaProducer;

@Timed
@RequestMapping("/")
String home() {
Expand Down Expand Up @@ -56,10 +67,40 @@ public ResponseEntity<String> test(RequestEntity<String> requestEntity,Principal
String json = requestEntity.getBody();
logger.info(String.format("[%s] [%s] [%s] [%s]", "POST", requestEntity.getUrl().getPath(), clientId, json));

PredictionRequestDef request;
try
{
PredictionRequestDef.Builder builder = PredictionRequestDef.newBuilder();
ProtoBufUtils.updateMessageBuilderFromJson(builder, requestEntity.getBody() );
request = builder.build();
}
catch (InvalidProtocolBufferException e)
{
logger.error("Bad request",e);
throw new APIException(ApiExceptionType.APIFE_INVALID_JSON,requestEntity.getBody());
}

HttpStatus httpStatus = HttpStatus.OK;

// At present passes JSON string. Could use gRPC?
String ret = predictionService.predict(json,clientId);

PredictionResponseDef response;
try
{
PredictionResponseDef.Builder builder = PredictionResponseDef.newBuilder();
ProtoBufUtils.updateMessageBuilderFromJson(builder, ret);
response = builder.build();
}
catch (InvalidProtocolBufferException e)
{
logger.error("Bad response",e);
throw new APIException(ApiExceptionType.APIFE_INVALID_RESPONSE_JSON,requestEntity.getBody());
}

kafkaProducer.send(clientId,PredictionRequestResponseDef.newBuilder().setRequest(request).setResponse(response).build());


HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(MediaType.APPLICATION_JSON);
ResponseEntity<String> responseEntity = new ResponseEntity<String>(ret, responseHeaders, httpStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum ApiExceptionType {
APIFE_INVALID_JSON(101,"Invalid JSON",400),
APIFE_INVALID_ENDPOINT_URL(102,"Invalid Endpoint URL",500),
APIFE_MICROSERVICE_ERROR(103,"Microservice error",500),
APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500);
APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500),
APIFE_INVALID_RESPONSE_JSON(105,"Invalid Response JSON",400);

int id;
String message;
Expand Down
8 changes: 8 additions & 0 deletions api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.seldon.apife.kafka;

import java.util.Map;

public abstract class Adapter {
public void close() {}
public void configure(Map<String,?> configs, boolean isKey) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.seldon.apife.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;

@Component
public class KafkaRequestResponseProducer {
private KafkaProducer<String, PredictionRequestResponseDef > producer;
private final static String topic = "predictions";

private static Logger logger = LoggerFactory.getLogger(KafkaRequestResponseProducer.class.getName());
final private static String ENV_VAR_SELDON_KAFKA_SERVER = "SELDON_ENGINE_KAFKA_SERVER";

public KafkaRequestResponseProducer()
{
String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER);
logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort));
if (kafkaHostPort == null) {
logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!");
kafkaHostPort = "localhost:9092";
}
logger.info("Starting kafka client with server "+kafkaHostPort);
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHostPort);
props.put("client.id", "RequestResponseProducer");
producer = new KafkaProducer<>(props, new StringSerializer(), new PredictionRequestResponseSerializer());
}

public void send(String clientId, PredictionRequestResponseDef data)
{
try {
producer.send(new ProducerRecord<>(clientId,
data.getResponse().getMeta().getPuid(),
data)).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public void createTopic()
{

}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.seldon.apife.kafka;

import org.apache.kafka.common.serialization.Serializer;

import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;

public class PredictionRequestResponseSerializer extends Adapter implements Serializer<PredictionRequestResponseDef> {

@Override
public byte[] serialize(final String topic, final PredictionRequestResponseDef data) {
return data.toByteArray();
}

}
22 changes: 22 additions & 0 deletions kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM anapsix/alpine-java

ARG kafka_version=0.11.0.0
ARG scala_version=2.12

RUN apk add --update unzip wget curl docker jq coreutils

ENV KAFKA_VERSION=$kafka_version SCALA_VERSION=$scala_version
ADD scripts/download-kafka.sh /tmp/download-kafka.sh
RUN chmod a+x /tmp/download-kafka.sh && sync && /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka

VOLUME ["/kafka"]

ENV KAFKA_HOME /opt/kafka
ENV PATH ${PATH}:${KAFKA_HOME}/bin
ADD scripts/start-kafka.sh /usr/bin/start-kafka.sh
ADD scripts/create-topics.sh /usr/bin/create-topics.sh
# The scripts need to have executable permission
RUN chmod a+x /usr/bin/start-kafka.sh && \
chmod a+x /usr/bin/create-topics.sh
# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
CMD ["start-kafka.sh"]
22 changes: 22 additions & 0 deletions kafka/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
DOCKER_IMAGE_NAME=seldonio/kafka-core
DOCKER_IMAGE_VERSION=0.1

build_docker_image:
mkdir -p .m2 && docker build --force-rm=true -t $(PRIVATE_REPO)$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) .
push_to_registry:
docker push $(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)

start_kafka: kafka.json
kubectl apply -f kafka.json
stop_kafka: kafka.json
kubectl delete --ignore-not-found=true -f kafka.json


#update_proto:
# cp -v ../proto/seldonengine.proto src/main/proto/
# cp -v ../proto/prediction.proto src/main/proto/

port_forward_kafka:
POD_NAME=$$(kubectl --namespace default get pod -l app=kafka -o template --template="{{(index .items 0).metadata.name}}") && \
kubectl port-forward $${POD_NAME} 9093:9093

159 changes: 159 additions & 0 deletions kafka/kafka.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
{
"apiVersion": "v1",
"items": [
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"creationTimestamp": null,
"labels": {
"app": "kafka",
"service" : "seldon"
},
"name": "kafka"
},
"spec": {
"ports": [
{
"name": "kafka-port",
"port": 9092,
"protocol": "TCP",
"targetPort": 9092,
"nodePort": 30010
}
],
"selector": {
"app": "kafka"
},
"sessionAffinity": "None",
"type": "NodePort"
}
},
{
"kind": "Deployment",
"apiVersion": "extensions/v1beta1",
"metadata": {
"creationTimestamp": null,
"labels": {
"app": "kafka",
"service" : "seldon"
},
"name": "kafka"
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"app": "kafka"
}
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"app": "kafka"
}
},
"spec": {
"containers": [
{
"env": [
{
"name": "KAFKA_BROKER_ID",
"value": "1"
},
{
"name": "KAFKA_ADVERTISED_PROTOCOL_NAME",
"value": "PLAINTEXT"
},
{
"name": "KAFKA_ADVERTISED_PORT",
"value": "9092"
},
{
"name": "KAFKA_PROTOCOL_NAME",
"value": "EXTERNAL"
},
{
"name": "KAFKA_ADVERTISED_HOST_NAME",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "status.podIP"
}
}
},
{
"name": "ADD_LISTENER",
"value": "EXTERNAL://0.0.0.0:9093"
},
{
"name": "KAFKA_INTER_BROKER_LISTENER_NAME",
"value": "PLAINTEXT"
},
{
"name": "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"value": "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL"
},
{
"name": "KAFKA_PORT",
"value": "9092"
},
{
"name": "KAFKA_HOST_NAME",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "status.podIP"
}
}
},
{
"name": "KAFKA_ZOOKEEPER_CONNECT",
"value": "zookeeper-1"
},
{
"name": "KAFKA_CREATE_TOPICS",
"value": "predictions:1:1"
},
{
"name": "KAFKA_LOG_RETENTION_HOURS",
"value": "2"
},
{
"name": "KAFKA_LOG_ROLL_HOURS",
"value": "2"
},
{
"name": "KAFKA_LOG_CLEANUP_POLICY",
"value": "delete"
},
{
"name": "KAFKA_LOG_CLEANER_ENABLE",
"value": "true"
}
],
"image": "seldonio/kafka-core:0.1",
"imagePullPolicy": "IfNotPresent",
"name": "kafka",
"ports": [
{
"containerPort": 9092,
"protocol": "TCP"
}
],
"resources": {},
"terminationMessagePath": "/dev/termination-log"
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Always",
"securityContext": {},
"terminationGracePeriodSeconds": 30
}
}
}
}
],
"kind": "List"
}
Loading

0 comments on commit 87dfccc

Please sign in to comment.