diff --git a/.gitignore b/.gitignore
index 4a4a5f411d..129fb808ee 100644
--- a/.gitignore
+++ b/.gitignore
@@ -39,3 +39,5 @@ tests/out.log
cluster-manager/settings.sh
engine/target/
zookeeper-k8s/zookeeper.json
+kafka/tests/src/prediction_pb2.py
+kafka/tests/src/seldonengine_pb2.py
diff --git a/api-frontend/apife.json.in b/api-frontend/apife.json.in
index 736229655f..18c8a11abe 100644
--- a/api-frontend/apife.json.in
+++ b/api-frontend/apife.json.in
@@ -23,6 +23,10 @@
"name": "SELDON_ENGINE_ZK_SERVERS",
"value": "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181"
},
+ {
+ "name": "SELDON_ENGINE_KAFKA_SERVER",
+ "value": "kafka:9092"
+ },
{
"name": "SELDON_CLUSTER_MANAGER_REDIS_HOST",
"value": "redis"
diff --git a/api-frontend/pom.xml b/api-frontend/pom.xml
index 556bceb04e..6813cb2ee1 100644
--- a/api-frontend/pom.xml
+++ b/api-frontend/pom.xml
@@ -196,6 +196,11 @@
0.0.23
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.0
+
diff --git a/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java b/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java
index e1da543e85..92de9399b0 100644
--- a/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java
+++ b/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java
@@ -16,8 +16,16 @@
import org.springframework.web.bind.annotation.RestController;
import com.codahale.metrics.annotation.Timed;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.seldon.apife.exception.APIException;
+import io.seldon.apife.exception.APIException.ApiExceptionType;
+import io.seldon.apife.kafka.KafkaRequestResponseProducer;
+import io.seldon.apife.pb.ProtoBufUtils;
import io.seldon.apife.service.PredictionService;
+import io.seldon.protos.PredictionProtos.PredictionRequestDef;
+import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;
+import io.seldon.protos.PredictionProtos.PredictionResponseDef;
@RestController
public class RestClientController {
@@ -27,6 +35,9 @@ public class RestClientController {
@Autowired
private PredictionService predictionService;
+ @Autowired
+ private KafkaRequestResponseProducer kafkaProducer;
+
@Timed
@RequestMapping("/")
String home() {
@@ -56,10 +67,40 @@ public ResponseEntity test(RequestEntity requestEntity,Principal
String json = requestEntity.getBody();
logger.info(String.format("[%s] [%s] [%s] [%s]", "POST", requestEntity.getUrl().getPath(), clientId, json));
+ PredictionRequestDef request;
+ try
+ {
+ PredictionRequestDef.Builder builder = PredictionRequestDef.newBuilder();
+ ProtoBufUtils.updateMessageBuilderFromJson(builder, requestEntity.getBody() );
+ request = builder.build();
+ }
+ catch (InvalidProtocolBufferException e)
+ {
+ logger.error("Bad request",e);
+ throw new APIException(ApiExceptionType.APIFE_INVALID_JSON,requestEntity.getBody());
+ }
+
HttpStatus httpStatus = HttpStatus.OK;
+ // At present passes JSON string. Could use gRPC?
String ret = predictionService.predict(json,clientId);
+ PredictionResponseDef response;
+ try
+ {
+ PredictionResponseDef.Builder builder = PredictionResponseDef.newBuilder();
+ ProtoBufUtils.updateMessageBuilderFromJson(builder, ret);
+ response = builder.build();
+ }
+ catch (InvalidProtocolBufferException e)
+ {
+ logger.error("Bad response",e);
+ throw new APIException(ApiExceptionType.APIFE_INVALID_RESPONSE_JSON,requestEntity.getBody());
+ }
+
+ kafkaProducer.send(clientId,PredictionRequestResponseDef.newBuilder().setRequest(request).setResponse(response).build());
+
+
HttpHeaders responseHeaders = new HttpHeaders();
responseHeaders.setContentType(MediaType.APPLICATION_JSON);
ResponseEntity responseEntity = new ResponseEntity(ret, responseHeaders, httpStatus);
diff --git a/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java b/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java
index 42ad7162a2..b1e5da7282 100644
--- a/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java
+++ b/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java
@@ -30,7 +30,8 @@ public enum ApiExceptionType {
APIFE_INVALID_JSON(101,"Invalid JSON",400),
APIFE_INVALID_ENDPOINT_URL(102,"Invalid Endpoint URL",500),
APIFE_MICROSERVICE_ERROR(103,"Microservice error",500),
- APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500);
+ APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500),
+ APIFE_INVALID_RESPONSE_JSON(105,"Invalid Response JSON",400);
int id;
String message;
diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java b/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java
new file mode 100644
index 0000000000..d7a3a520ce
--- /dev/null
+++ b/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java
@@ -0,0 +1,8 @@
+package io.seldon.apife.kafka;
+
+import java.util.Map;
+
+public abstract class Adapter {
+ public void close() {}
+ public void configure(Map configs, boolean isKey) {}
+}
\ No newline at end of file
diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java b/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java
new file mode 100644
index 0000000000..d637fadb0f
--- /dev/null
+++ b/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java
@@ -0,0 +1,56 @@
+package io.seldon.apife.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;
+
+@Component
+public class KafkaRequestResponseProducer {
+ private KafkaProducer producer;
+ private final static String topic = "predictions";
+
+ private static Logger logger = LoggerFactory.getLogger(KafkaRequestResponseProducer.class.getName());
+ final private static String ENV_VAR_SELDON_KAFKA_SERVER = "SELDON_ENGINE_KAFKA_SERVER";
+
+ public KafkaRequestResponseProducer()
+ {
+ String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER);
+ logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort));
+ if (kafkaHostPort == null) {
+ logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!");
+ kafkaHostPort = "localhost:9092";
+ }
+ logger.info("Starting kafka client with server "+kafkaHostPort);
+ Properties props = new Properties();
+ props.put("bootstrap.servers", kafkaHostPort);
+ props.put("client.id", "RequestResponseProducer");
+ producer = new KafkaProducer<>(props, new StringSerializer(), new PredictionRequestResponseSerializer());
+ }
+
+ public void send(String clientId, PredictionRequestResponseDef data)
+ {
+ try {
+ producer.send(new ProducerRecord<>(clientId,
+ data.getResponse().getMeta().getPuid(),
+ data)).get();
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void createTopic()
+ {
+
+ }
+}
+
+
diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java b/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java
new file mode 100644
index 0000000000..8c2d1d0d2a
--- /dev/null
+++ b/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java
@@ -0,0 +1,14 @@
+package io.seldon.apife.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef;
+
+public class PredictionRequestResponseSerializer extends Adapter implements Serializer {
+
+ @Override
+ public byte[] serialize(final String topic, final PredictionRequestResponseDef data) {
+ return data.toByteArray();
+ }
+
+}
diff --git a/kafka/Dockerfile b/kafka/Dockerfile
new file mode 100644
index 0000000000..3d83137d82
--- /dev/null
+++ b/kafka/Dockerfile
@@ -0,0 +1,22 @@
+FROM anapsix/alpine-java
+
+ARG kafka_version=0.11.0.0
+ARG scala_version=2.12
+
+RUN apk add --update unzip wget curl docker jq coreutils
+
+ENV KAFKA_VERSION=$kafka_version SCALA_VERSION=$scala_version
+ADD scripts/download-kafka.sh /tmp/download-kafka.sh
+RUN chmod a+x /tmp/download-kafka.sh && sync && /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka
+
+VOLUME ["/kafka"]
+
+ENV KAFKA_HOME /opt/kafka
+ENV PATH ${PATH}:${KAFKA_HOME}/bin
+ADD scripts/start-kafka.sh /usr/bin/start-kafka.sh
+ADD scripts/create-topics.sh /usr/bin/create-topics.sh
+# The scripts need to have executable permission
+RUN chmod a+x /usr/bin/start-kafka.sh && \
+ chmod a+x /usr/bin/create-topics.sh
+# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown)
+CMD ["start-kafka.sh"]
diff --git a/kafka/Makefile b/kafka/Makefile
new file mode 100644
index 0000000000..3f0ebbb70e
--- /dev/null
+++ b/kafka/Makefile
@@ -0,0 +1,22 @@
+DOCKER_IMAGE_NAME=seldonio/kafka-core
+DOCKER_IMAGE_VERSION=0.1
+
+build_docker_image:
+ mkdir -p .m2 && docker build --force-rm=true -t $(PRIVATE_REPO)$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) .
+push_to_registry:
+ docker push $(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
+
+start_kafka: kafka.json
+ kubectl apply -f kafka.json
+stop_kafka: kafka.json
+ kubectl delete --ignore-not-found=true -f kafka.json
+
+
+#update_proto:
+# cp -v ../proto/seldonengine.proto src/main/proto/
+# cp -v ../proto/prediction.proto src/main/proto/
+
+port_forward_kafka:
+ POD_NAME=$$(kubectl --namespace default get pod -l app=kafka -o template --template="{{(index .items 0).metadata.name}}") && \
+ kubectl port-forward $${POD_NAME} 9093:9093
+
diff --git a/kafka/kafka.json b/kafka/kafka.json
new file mode 100644
index 0000000000..abf56d2c68
--- /dev/null
+++ b/kafka/kafka.json
@@ -0,0 +1,159 @@
+{
+ "apiVersion": "v1",
+ "items": [
+ {
+ "apiVersion": "v1",
+ "kind": "Service",
+ "metadata": {
+ "creationTimestamp": null,
+ "labels": {
+ "app": "kafka",
+ "service" : "seldon"
+ },
+ "name": "kafka"
+ },
+ "spec": {
+ "ports": [
+ {
+ "name": "kafka-port",
+ "port": 9092,
+ "protocol": "TCP",
+ "targetPort": 9092,
+ "nodePort": 30010
+ }
+ ],
+ "selector": {
+ "app": "kafka"
+ },
+ "sessionAffinity": "None",
+ "type": "NodePort"
+ }
+ },
+ {
+ "kind": "Deployment",
+ "apiVersion": "extensions/v1beta1",
+ "metadata": {
+ "creationTimestamp": null,
+ "labels": {
+ "app": "kafka",
+ "service" : "seldon"
+ },
+ "name": "kafka"
+ },
+ "spec": {
+ "replicas": 1,
+ "selector": {
+ "matchLabels": {
+ "app": "kafka"
+ }
+ },
+ "template": {
+ "metadata": {
+ "creationTimestamp": null,
+ "labels": {
+ "app": "kafka"
+ }
+ },
+ "spec": {
+ "containers": [
+ {
+ "env": [
+ {
+ "name": "KAFKA_BROKER_ID",
+ "value": "1"
+ },
+ {
+ "name": "KAFKA_ADVERTISED_PROTOCOL_NAME",
+ "value": "PLAINTEXT"
+ },
+ {
+ "name": "KAFKA_ADVERTISED_PORT",
+ "value": "9092"
+ },
+ {
+ "name": "KAFKA_PROTOCOL_NAME",
+ "value": "EXTERNAL"
+ },
+ {
+ "name": "KAFKA_ADVERTISED_HOST_NAME",
+ "valueFrom": {
+ "fieldRef": {
+ "apiVersion": "v1",
+ "fieldPath": "status.podIP"
+ }
+ }
+ },
+ {
+ "name": "ADD_LISTENER",
+ "value": "EXTERNAL://0.0.0.0:9093"
+ },
+ {
+ "name": "KAFKA_INTER_BROKER_LISTENER_NAME",
+ "value": "PLAINTEXT"
+ },
+ {
+ "name": "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
+ "value": "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL"
+ },
+ {
+ "name": "KAFKA_PORT",
+ "value": "9092"
+ },
+ {
+ "name": "KAFKA_HOST_NAME",
+ "valueFrom": {
+ "fieldRef": {
+ "apiVersion": "v1",
+ "fieldPath": "status.podIP"
+ }
+ }
+ },
+ {
+ "name": "KAFKA_ZOOKEEPER_CONNECT",
+ "value": "zookeeper-1"
+ },
+ {
+ "name": "KAFKA_CREATE_TOPICS",
+ "value": "predictions:1:1"
+ },
+ {
+ "name": "KAFKA_LOG_RETENTION_HOURS",
+ "value": "2"
+ },
+ {
+ "name": "KAFKA_LOG_ROLL_HOURS",
+ "value": "2"
+ },
+ {
+ "name": "KAFKA_LOG_CLEANUP_POLICY",
+ "value": "delete"
+ },
+ {
+ "name": "KAFKA_LOG_CLEANER_ENABLE",
+ "value": "true"
+ }
+ ],
+ "image": "seldonio/kafka-core:0.1",
+ "imagePullPolicy": "IfNotPresent",
+ "name": "kafka",
+ "ports": [
+ {
+ "containerPort": 9092,
+ "protocol": "TCP"
+ }
+ ],
+ "resources": {},
+ "terminationMessagePath": "/dev/termination-log"
+ }
+ ],
+ "dnsPolicy": "ClusterFirst",
+ "restartPolicy": "Always",
+ "securityContext": {},
+ "terminationGracePeriodSeconds": 30
+ }
+ }
+ }
+ }
+ ],
+ "kind": "List"
+}
diff --git a/kafka/scripts/create-topics.sh b/kafka/scripts/create-topics.sh
new file mode 100755
index 0000000000..34945b32d0
--- /dev/null
+++ b/kafka/scripts/create-topics.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+
+
+if [[ -z "$START_TIMEOUT" ]]; then
+ START_TIMEOUT=600
+fi
+
+start_timeout_exceeded=false
+count=0
+step=10
+while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do
+ echo "waiting for kafka to be ready"
+ sleep $step;
+ count=$(expr $count + $step)
+ if [ $count -gt $START_TIMEOUT ]; then
+ start_timeout_exceeded=true
+ break
+ fi
+done
+
+if $start_timeout_exceeded; then
+ echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
+ exit 1
+fi
+
+if [[ -n $KAFKA_CREATE_TOPICS ]]; then
+ IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do
+ echo "creating topics: $topicToCreate"
+ IFS=':' read -a topicConfig <<< "$topicToCreate"
+ if [ ${topicConfig[3]} ]; then
+ JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --config cleanup.policy="${topicConfig[3]}" --if-not-exists
+ else
+ JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --if-not-exists
+ fi
+ done
+fi
diff --git a/kafka/scripts/download-kafka.sh b/kafka/scripts/download-kafka.sh
new file mode 100755
index 0000000000..2ddc911ea4
--- /dev/null
+++ b/kafka/scripts/download-kafka.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred')
+url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
+wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
diff --git a/kafka/scripts/start-kafka.sh b/kafka/scripts/start-kafka.sh
new file mode 100755
index 0000000000..76b613c35f
--- /dev/null
+++ b/kafka/scripts/start-kafka.sh
@@ -0,0 +1,108 @@
+#!/bin/bash
+
+if [[ -z "$KAFKA_PORT" ]]; then
+ export KAFKA_PORT=9092
+fi
+
+create-topics.sh &
+
+if [[ -z "$KAFKA_ADVERTISED_PORT" && \
+ -z "$KAFKA_LISTENERS" && \
+ -z "$KAFKA_ADVERTISED_LISTENERS" && \
+ -S /var/run/docker.sock ]]; then
+ export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g")
+fi
+if [[ -z "$KAFKA_BROKER_ID" ]]; then
+ if [[ -n "$BROKER_ID_COMMAND" ]]; then
+ export KAFKA_BROKER_ID=$(eval $BROKER_ID_COMMAND)
+ else
+ # By default auto allocate broker ID
+ export KAFKA_BROKER_ID=-1
+ fi
+fi
+if [[ -z "$KAFKA_LOG_DIRS" ]]; then
+ export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME"
+fi
+if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then
+ export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,)
+fi
+
+if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
+ sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
+ unset KAFKA_HEAP_OPTS
+fi
+
+if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then
+ export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND)
+fi
+
+#DEFAULT LISTENERS
+export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT-$KAFKA_PORT}"
+export KAFKA_LISTENERS="PLAINTEXT://${KAFKA_HOST_NAME-}:${KAFKA_PORT-9092}"
+
+if [[ -n "$ADD_LISTENER" && -n "$KAFKA_LISTENERS" ]]; then
+ export KAFKA_LISTENERS="${KAFKA_LISTENERS},${ADD_LISTENER}"
+fi
+
+if [[ -n "$ADD_LISTENER" && -z "$KAFKA_LISTENERS" ]]; then
+ export KAFKA_LISTENERS="${ADD_LISTENER}"
+fi
+
+if [[ -n "$ADD_LISTENER" && -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
+ export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${ADD_LISTENER}"
+fi
+
+if [[ -n "$ADD_LISTENER" && -z "$KAFKA_ADVERTISED_LISTENERS" ]]; then
+ export KAFKA_ADVERTISED_LISTENERS="${ADD_LISTENER}"
+fi
+
+
+if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then
+ export KAFKA_BROKER_RACK=$(eval $RACK_COMMAND)
+fi
+
+#Issue newline to config file in case there is not one already
+echo -e "\n" >> $KAFKA_HOME/config/server.properties
+
+unset KAFKA_CREATE_TOPICS
+unset KAFKA_ADVERTISED_PROTOCOL_NAME
+unset KAFKA_PROTOCOL_NAME
+
+if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then
+ unset KAFKA_ADVERTISED_PORT
+ unset KAFKA_ADVERTISED_HOST_NAME
+fi
+
+if [[ -n "$KAFKA_LISTENERS" ]]; then
+ unset KAFKA_PORT
+ unset KAFKA_HOST_NAME
+fi
+
+for VAR in `env`
+do
+ if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
+ kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+ env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+ if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
+ sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
+ else
+ echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
+ fi
+ fi
+
+ if [[ $VAR =~ ^LOG4J_ ]]; then
+ log4j_name=`echo "$VAR" | sed -r "s/(LOG4J_.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
+ log4j_env=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
+ if egrep -q "(^|^#)$log4j_name=" $KAFKA_HOME/config/log4j.properties; then
+ sed -r -i "s@(^|^#)($log4j_name)=(.*)@\2=${!log4j_env}@g" $KAFKA_HOME/config/log4j.properties #note that no config values may contain an '@' char
+ else
+ echo "$log4j_name=${!log4j_env}" >> $KAFKA_HOME/config/log4j.properties
+ fi
+ fi
+done
+
+if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then
+ eval $CUSTOM_INIT_SCRIPT
+fi
+
+exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
diff --git a/kafka/tests/Makefile b/kafka/tests/Makefile
new file mode 100644
index 0000000000..0c7b88722b
--- /dev/null
+++ b/kafka/tests/Makefile
@@ -0,0 +1,10 @@
+SHELL=/bin/bash
+
+
+compile_proto:
+ python -m grpc.tools.protoc -I ../../proto --python_out=src/ --grpc_python_out=src/ ../../proto/prediction.proto
+ python -m grpc.tools.protoc -I ../../proto --python_out=src/ --grpc_python_out=src/ ../../proto/seldonengine.proto
+
+
+
+
diff --git a/kafka/tests/src/read_predictions.py b/kafka/tests/src/read_predictions.py
new file mode 100644
index 0000000000..7f3f546544
--- /dev/null
+++ b/kafka/tests/src/read_predictions.py
@@ -0,0 +1,30 @@
+import os
+import sys, getopt, argparse
+import logging
+import json
+from kafka import KafkaConsumer
+from kafka import TopicPartition
+import prediction_pb2 as ppb
+
+if __name__ == '__main__':
+ import logging
+ logger = logging.getLogger()
+ logging.basicConfig(format='%(asctime)s : %(levelname)s : %(name)s : %(message)s', level=logging.DEBUG)
+ logger.setLevel(logging.INFO)
+
+ parser = argparse.ArgumentParser(prog='read_predictions')
+ parser.add_argument('--kafka', help='kafka endpoint', default="localhost:9093")
+ parser.add_argument('--topic', help='kafka topic', required=True)
+
+ args = parser.parse_args()
+ opts = vars(args)
+
+ consumer = KafkaConsumer(client_id="py-kafka",group_id=None,bootstrap_servers=args.kafka)
+ partition = TopicPartition(args.topic, 0)
+ consumer.assign([partition])
+ consumer.seek(partition, 0)
+ for msg in consumer:
+ print msg
+ message = ppb.PredictionRequestResponseDef()
+ message.ParseFromString(msg.value)
+ print message
diff --git a/proto/prediction.proto b/proto/prediction.proto
index ec6b36b7ae..3822d59029 100644
--- a/proto/prediction.proto
+++ b/proto/prediction.proto
@@ -104,6 +104,11 @@ message PredictionFeedbackDef {
// [END Feedback]
+message PredictionRequestResponseDef {
+ PredictionRequestDef request = 1;
+ PredictionResponseDef response = 2;
+}
+
diff --git a/start-all b/start-all
index 51268c07ca..7fa51f9ca3 100755
--- a/start-all
+++ b/start-all
@@ -8,6 +8,7 @@ STARTUP_DIR="$( cd "$( dirname "$0" )" && pwd )"
cd "${STARTUP_DIR}/redis-memonly" && make start_redis
cd "${STARTUP_DIR}/zookeeper-k8s" && make start_zookeeper
+cd "${STARTUP_DIR}/kafka" && make start_kafka
cd "${STARTUP_DIR}/cluster-manager" && make start_cluster_manager
cd "${STARTUP_DIR}/api-frontend" && make start_apife
diff --git a/stop-all b/stop-all
index 8ab7f99d71..ea7d6155bf 100755
--- a/stop-all
+++ b/stop-all
@@ -8,6 +8,8 @@ STARTUP_DIR="$( cd "$( dirname "$0" )" && pwd )"
cd "${STARTUP_DIR}/api-frontend" && make stop_apife
cd "${STARTUP_DIR}/cluster-manager" && make stop_cluster_manager
+cd "${STARTUP_DIR}/kafka" && make stop_kafka
cd "${STARTUP_DIR}/zookeeper-k8s" && make stop_zookeeper
cd "${STARTUP_DIR}/redis-memonly" && make stop_redis
+
diff --git a/tests/run_mean_deploy_test.sh b/tests/run_mean_deploy_test.sh
index 807b94e6e9..c7207907b4 100755
--- a/tests/run_mean_deploy_test.sh
+++ b/tests/run_mean_deploy_test.sh
@@ -3,8 +3,8 @@
make cm_create_deployment
until make cm_check_deployment_ready
do
- echo "Waiting for deployment to be ready"
- sleep 1
+ echo "Waiting 5 secs for deployment to be ready"
+ sleep 5
done
sleep 10
make api_post_test