From 87dfccc4e5f6f9a3f99c125f1bb1fd7def1cade7 Mon Sep 17 00:00:00 2001 From: Clive Cox Date: Mon, 28 Aug 2017 10:05:04 +0100 Subject: [PATCH] initial kafka integration --- .gitignore | 2 + api-frontend/apife.json.in | 4 + api-frontend/pom.xml | 5 + .../apife/api/rest/RestClientController.java | 41 +++++ .../seldon/apife/exception/APIException.java | 3 +- .../java/io/seldon/apife/kafka/Adapter.java | 8 + .../kafka/KafkaRequestResponseProducer.java | 56 ++++++ .../PredictionRequestResponseSerializer.java | 14 ++ kafka/Dockerfile | 22 +++ kafka/Makefile | 22 +++ kafka/kafka.json | 159 ++++++++++++++++++ kafka/scripts/create-topics.sh | 36 ++++ kafka/scripts/download-kafka.sh | 5 + kafka/scripts/start-kafka.sh | 108 ++++++++++++ kafka/tests/Makefile | 10 ++ kafka/tests/src/read_predictions.py | 30 ++++ proto/prediction.proto | 5 + start-all | 1 + stop-all | 2 + tests/run_mean_deploy_test.sh | 4 +- 20 files changed, 534 insertions(+), 3 deletions(-) create mode 100644 api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java create mode 100644 api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java create mode 100644 api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java create mode 100644 kafka/Dockerfile create mode 100644 kafka/Makefile create mode 100644 kafka/kafka.json create mode 100755 kafka/scripts/create-topics.sh create mode 100755 kafka/scripts/download-kafka.sh create mode 100755 kafka/scripts/start-kafka.sh create mode 100644 kafka/tests/Makefile create mode 100644 kafka/tests/src/read_predictions.py diff --git a/.gitignore b/.gitignore index 4a4a5f411d..129fb808ee 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,5 @@ tests/out.log cluster-manager/settings.sh engine/target/ zookeeper-k8s/zookeeper.json +kafka/tests/src/prediction_pb2.py +kafka/tests/src/seldonengine_pb2.py diff --git a/api-frontend/apife.json.in b/api-frontend/apife.json.in index 736229655f..18c8a11abe 100644 --- a/api-frontend/apife.json.in +++ b/api-frontend/apife.json.in @@ -23,6 +23,10 @@ "name": "SELDON_ENGINE_ZK_SERVERS", "value": "zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181" }, + { + "name": "SELDON_ENGINE_KAFKA_SERVER", + "value": "kafka:9092" + }, { "name": "SELDON_CLUSTER_MANAGER_REDIS_HOST", "value": "redis" diff --git a/api-frontend/pom.xml b/api-frontend/pom.xml index 556bceb04e..6813cb2ee1 100644 --- a/api-frontend/pom.xml +++ b/api-frontend/pom.xml @@ -196,6 +196,11 @@ 0.0.23 + + org.apache.kafka + kafka-clients + 0.11.0.0 + diff --git a/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java b/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java index e1da543e85..92de9399b0 100644 --- a/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java +++ b/api-frontend/src/main/java/io/seldon/apife/api/rest/RestClientController.java @@ -16,8 +16,16 @@ import org.springframework.web.bind.annotation.RestController; import com.codahale.metrics.annotation.Timed; +import com.google.protobuf.InvalidProtocolBufferException; +import io.seldon.apife.exception.APIException; +import io.seldon.apife.exception.APIException.ApiExceptionType; +import io.seldon.apife.kafka.KafkaRequestResponseProducer; +import io.seldon.apife.pb.ProtoBufUtils; import io.seldon.apife.service.PredictionService; +import io.seldon.protos.PredictionProtos.PredictionRequestDef; +import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef; +import io.seldon.protos.PredictionProtos.PredictionResponseDef; @RestController public class RestClientController { @@ -27,6 +35,9 @@ public class RestClientController { @Autowired private PredictionService predictionService; + @Autowired + private KafkaRequestResponseProducer kafkaProducer; + @Timed @RequestMapping("/") String home() { @@ -56,10 +67,40 @@ public ResponseEntity test(RequestEntity requestEntity,Principal String json = requestEntity.getBody(); logger.info(String.format("[%s] [%s] [%s] [%s]", "POST", requestEntity.getUrl().getPath(), clientId, json)); + PredictionRequestDef request; + try + { + PredictionRequestDef.Builder builder = PredictionRequestDef.newBuilder(); + ProtoBufUtils.updateMessageBuilderFromJson(builder, requestEntity.getBody() ); + request = builder.build(); + } + catch (InvalidProtocolBufferException e) + { + logger.error("Bad request",e); + throw new APIException(ApiExceptionType.APIFE_INVALID_JSON,requestEntity.getBody()); + } + HttpStatus httpStatus = HttpStatus.OK; + // At present passes JSON string. Could use gRPC? String ret = predictionService.predict(json,clientId); + PredictionResponseDef response; + try + { + PredictionResponseDef.Builder builder = PredictionResponseDef.newBuilder(); + ProtoBufUtils.updateMessageBuilderFromJson(builder, ret); + response = builder.build(); + } + catch (InvalidProtocolBufferException e) + { + logger.error("Bad response",e); + throw new APIException(ApiExceptionType.APIFE_INVALID_RESPONSE_JSON,requestEntity.getBody()); + } + + kafkaProducer.send(clientId,PredictionRequestResponseDef.newBuilder().setRequest(request).setResponse(response).build()); + + HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.setContentType(MediaType.APPLICATION_JSON); ResponseEntity responseEntity = new ResponseEntity(ret, responseHeaders, httpStatus); diff --git a/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java b/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java index 42ad7162a2..b1e5da7282 100644 --- a/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java +++ b/api-frontend/src/main/java/io/seldon/apife/exception/APIException.java @@ -30,7 +30,8 @@ public enum ApiExceptionType { APIFE_INVALID_JSON(101,"Invalid JSON",400), APIFE_INVALID_ENDPOINT_URL(102,"Invalid Endpoint URL",500), APIFE_MICROSERVICE_ERROR(103,"Microservice error",500), - APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500); + APIFE_NO_RUNNING_DEPLOYMENT(104,"No Running Deployment",500), + APIFE_INVALID_RESPONSE_JSON(105,"Invalid Response JSON",400); int id; String message; diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java b/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java new file mode 100644 index 0000000000..d7a3a520ce --- /dev/null +++ b/api-frontend/src/main/java/io/seldon/apife/kafka/Adapter.java @@ -0,0 +1,8 @@ +package io.seldon.apife.kafka; + +import java.util.Map; + +public abstract class Adapter { + public void close() {} + public void configure(Map configs, boolean isKey) {} +} \ No newline at end of file diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java b/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java new file mode 100644 index 0000000000..d637fadb0f --- /dev/null +++ b/api-frontend/src/main/java/io/seldon/apife/kafka/KafkaRequestResponseProducer.java @@ -0,0 +1,56 @@ +package io.seldon.apife.kafka; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef; + +@Component +public class KafkaRequestResponseProducer { + private KafkaProducer producer; + private final static String topic = "predictions"; + + private static Logger logger = LoggerFactory.getLogger(KafkaRequestResponseProducer.class.getName()); + final private static String ENV_VAR_SELDON_KAFKA_SERVER = "SELDON_ENGINE_KAFKA_SERVER"; + + public KafkaRequestResponseProducer() + { + String kafkaHostPort = System.getenv(ENV_VAR_SELDON_KAFKA_SERVER); + logger.info(String.format("using %s[%s]", ENV_VAR_SELDON_KAFKA_SERVER, kafkaHostPort)); + if (kafkaHostPort == null) { + logger.warn("*WARNING* SELDON_KAFKA_SERVER environment variable not set!"); + kafkaHostPort = "localhost:9092"; + } + logger.info("Starting kafka client with server "+kafkaHostPort); + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaHostPort); + props.put("client.id", "RequestResponseProducer"); + producer = new KafkaProducer<>(props, new StringSerializer(), new PredictionRequestResponseSerializer()); + } + + public void send(String clientId, PredictionRequestResponseDef data) + { + try { + producer.send(new ProducerRecord<>(clientId, + data.getResponse().getMeta().getPuid(), + data)).get(); + } catch (InterruptedException | ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void createTopic() + { + + } +} + + diff --git a/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java b/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java new file mode 100644 index 0000000000..8c2d1d0d2a --- /dev/null +++ b/api-frontend/src/main/java/io/seldon/apife/kafka/PredictionRequestResponseSerializer.java @@ -0,0 +1,14 @@ +package io.seldon.apife.kafka; + +import org.apache.kafka.common.serialization.Serializer; + +import io.seldon.protos.PredictionProtos.PredictionRequestResponseDef; + +public class PredictionRequestResponseSerializer extends Adapter implements Serializer { + + @Override + public byte[] serialize(final String topic, final PredictionRequestResponseDef data) { + return data.toByteArray(); + } + +} diff --git a/kafka/Dockerfile b/kafka/Dockerfile new file mode 100644 index 0000000000..3d83137d82 --- /dev/null +++ b/kafka/Dockerfile @@ -0,0 +1,22 @@ +FROM anapsix/alpine-java + +ARG kafka_version=0.11.0.0 +ARG scala_version=2.12 + +RUN apk add --update unzip wget curl docker jq coreutils + +ENV KAFKA_VERSION=$kafka_version SCALA_VERSION=$scala_version +ADD scripts/download-kafka.sh /tmp/download-kafka.sh +RUN chmod a+x /tmp/download-kafka.sh && sync && /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka + +VOLUME ["/kafka"] + +ENV KAFKA_HOME /opt/kafka +ENV PATH ${PATH}:${KAFKA_HOME}/bin +ADD scripts/start-kafka.sh /usr/bin/start-kafka.sh +ADD scripts/create-topics.sh /usr/bin/create-topics.sh +# The scripts need to have executable permission +RUN chmod a+x /usr/bin/start-kafka.sh && \ + chmod a+x /usr/bin/create-topics.sh +# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown) +CMD ["start-kafka.sh"] diff --git a/kafka/Makefile b/kafka/Makefile new file mode 100644 index 0000000000..3f0ebbb70e --- /dev/null +++ b/kafka/Makefile @@ -0,0 +1,22 @@ +DOCKER_IMAGE_NAME=seldonio/kafka-core +DOCKER_IMAGE_VERSION=0.1 + +build_docker_image: + mkdir -p .m2 && docker build --force-rm=true -t $(PRIVATE_REPO)$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) . +push_to_registry: + docker push $(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) + +start_kafka: kafka.json + kubectl apply -f kafka.json +stop_kafka: kafka.json + kubectl delete --ignore-not-found=true -f kafka.json + + +#update_proto: +# cp -v ../proto/seldonengine.proto src/main/proto/ +# cp -v ../proto/prediction.proto src/main/proto/ + +port_forward_kafka: + POD_NAME=$$(kubectl --namespace default get pod -l app=kafka -o template --template="{{(index .items 0).metadata.name}}") && \ + kubectl port-forward $${POD_NAME} 9093:9093 + diff --git a/kafka/kafka.json b/kafka/kafka.json new file mode 100644 index 0000000000..abf56d2c68 --- /dev/null +++ b/kafka/kafka.json @@ -0,0 +1,159 @@ +{ + "apiVersion": "v1", + "items": [ + { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "creationTimestamp": null, + "labels": { + "app": "kafka", + "service" : "seldon" + }, + "name": "kafka" + }, + "spec": { + "ports": [ + { + "name": "kafka-port", + "port": 9092, + "protocol": "TCP", + "targetPort": 9092, + "nodePort": 30010 + } + ], + "selector": { + "app": "kafka" + }, + "sessionAffinity": "None", + "type": "NodePort" + } + }, + { + "kind": "Deployment", + "apiVersion": "extensions/v1beta1", + "metadata": { + "creationTimestamp": null, + "labels": { + "app": "kafka", + "service" : "seldon" + }, + "name": "kafka" + }, + "spec": { + "replicas": 1, + "selector": { + "matchLabels": { + "app": "kafka" + } + }, + "template": { + "metadata": { + "creationTimestamp": null, + "labels": { + "app": "kafka" + } + }, + "spec": { + "containers": [ + { + "env": [ + { + "name": "KAFKA_BROKER_ID", + "value": "1" + }, + { + "name": "KAFKA_ADVERTISED_PROTOCOL_NAME", + "value": "PLAINTEXT" + }, + { + "name": "KAFKA_ADVERTISED_PORT", + "value": "9092" + }, + { + "name": "KAFKA_PROTOCOL_NAME", + "value": "EXTERNAL" + }, + { + "name": "KAFKA_ADVERTISED_HOST_NAME", + "valueFrom": { + "fieldRef": { + "apiVersion": "v1", + "fieldPath": "status.podIP" + } + } + }, + { + "name": "ADD_LISTENER", + "value": "EXTERNAL://0.0.0.0:9093" + }, + { + "name": "KAFKA_INTER_BROKER_LISTENER_NAME", + "value": "PLAINTEXT" + }, + { + "name": "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", + "value": "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL" + }, + { + "name": "KAFKA_PORT", + "value": "9092" + }, + { + "name": "KAFKA_HOST_NAME", + "valueFrom": { + "fieldRef": { + "apiVersion": "v1", + "fieldPath": "status.podIP" + } + } + }, + { + "name": "KAFKA_ZOOKEEPER_CONNECT", + "value": "zookeeper-1" + }, + { + "name": "KAFKA_CREATE_TOPICS", + "value": "predictions:1:1" + }, + { + "name": "KAFKA_LOG_RETENTION_HOURS", + "value": "2" + }, + { + "name": "KAFKA_LOG_ROLL_HOURS", + "value": "2" + }, + { + "name": "KAFKA_LOG_CLEANUP_POLICY", + "value": "delete" + }, + { + "name": "KAFKA_LOG_CLEANER_ENABLE", + "value": "true" + } + ], + "image": "seldonio/kafka-core:0.1", + "imagePullPolicy": "IfNotPresent", + "name": "kafka", + "ports": [ + { + "containerPort": 9092, + "protocol": "TCP" + } + ], + "resources": {}, + "terminationMessagePath": "/dev/termination-log" + } + ], + "dnsPolicy": "ClusterFirst", + "restartPolicy": "Always", + "securityContext": {}, + "terminationGracePeriodSeconds": 30 + } + } + } + } + ], + "kind": "List" +} diff --git a/kafka/scripts/create-topics.sh b/kafka/scripts/create-topics.sh new file mode 100755 index 0000000000..34945b32d0 --- /dev/null +++ b/kafka/scripts/create-topics.sh @@ -0,0 +1,36 @@ +#!/bin/bash + + +if [[ -z "$START_TIMEOUT" ]]; then + START_TIMEOUT=600 +fi + +start_timeout_exceeded=false +count=0 +step=10 +while netstat -lnt | awk '$4 ~ /:'$KAFKA_PORT'$/ {exit 1}'; do + echo "waiting for kafka to be ready" + sleep $step; + count=$(expr $count + $step) + if [ $count -gt $START_TIMEOUT ]; then + start_timeout_exceeded=true + break + fi +done + +if $start_timeout_exceeded; then + echo "Not able to auto-create topic (waited for $START_TIMEOUT sec)" + exit 1 +fi + +if [[ -n $KAFKA_CREATE_TOPICS ]]; then + IFS=','; for topicToCreate in $KAFKA_CREATE_TOPICS; do + echo "creating topics: $topicToCreate" + IFS=':' read -a topicConfig <<< "$topicToCreate" + if [ ${topicConfig[3]} ]; then + JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --config cleanup.policy="${topicConfig[3]}" --if-not-exists + else + JMX_PORT='' $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper $KAFKA_ZOOKEEPER_CONNECT --replication-factor ${topicConfig[2]} --partitions ${topicConfig[1]} --topic "${topicConfig[0]}" --if-not-exists + fi + done +fi diff --git a/kafka/scripts/download-kafka.sh b/kafka/scripts/download-kafka.sh new file mode 100755 index 0000000000..2ddc911ea4 --- /dev/null +++ b/kafka/scripts/download-kafka.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +mirror=$(curl --stderr /dev/null https://www.apache.org/dyn/closer.cgi\?as_json\=1 | jq -r '.preferred') +url="${mirror}kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" +wget -q "${url}" -O "/tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" diff --git a/kafka/scripts/start-kafka.sh b/kafka/scripts/start-kafka.sh new file mode 100755 index 0000000000..76b613c35f --- /dev/null +++ b/kafka/scripts/start-kafka.sh @@ -0,0 +1,108 @@ +#!/bin/bash + +if [[ -z "$KAFKA_PORT" ]]; then + export KAFKA_PORT=9092 +fi + +create-topics.sh & + +if [[ -z "$KAFKA_ADVERTISED_PORT" && \ + -z "$KAFKA_LISTENERS" && \ + -z "$KAFKA_ADVERTISED_LISTENERS" && \ + -S /var/run/docker.sock ]]; then + export KAFKA_ADVERTISED_PORT=$(docker port `hostname` $KAFKA_PORT | sed -r "s/.*:(.*)/\1/g") +fi +if [[ -z "$KAFKA_BROKER_ID" ]]; then + if [[ -n "$BROKER_ID_COMMAND" ]]; then + export KAFKA_BROKER_ID=$(eval $BROKER_ID_COMMAND) + else + # By default auto allocate broker ID + export KAFKA_BROKER_ID=-1 + fi +fi +if [[ -z "$KAFKA_LOG_DIRS" ]]; then + export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME" +fi +if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then + export KAFKA_ZOOKEEPER_CONNECT=$(env | grep ZK.*PORT_2181_TCP= | sed -e 's|.*tcp://||' | paste -sd ,) +fi + +if [[ -n "$KAFKA_HEAP_OPTS" ]]; then + sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh + unset KAFKA_HEAP_OPTS +fi + +if [[ -z "$KAFKA_ADVERTISED_HOST_NAME" && -n "$HOSTNAME_COMMAND" ]]; then + export KAFKA_ADVERTISED_HOST_NAME=$(eval $HOSTNAME_COMMAND) +fi + +#DEFAULT LISTENERS +export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME-}:${KAFKA_ADVERTISED_PORT-$KAFKA_PORT}" +export KAFKA_LISTENERS="PLAINTEXT://${KAFKA_HOST_NAME-}:${KAFKA_PORT-9092}" + +if [[ -n "$ADD_LISTENER" && -n "$KAFKA_LISTENERS" ]]; then + export KAFKA_LISTENERS="${KAFKA_LISTENERS},${ADD_LISTENER}" +fi + +if [[ -n "$ADD_LISTENER" && -z "$KAFKA_LISTENERS" ]]; then + export KAFKA_LISTENERS="${ADD_LISTENER}" +fi + +if [[ -n "$ADD_LISTENER" && -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then + export KAFKA_ADVERTISED_LISTENERS="${KAFKA_ADVERTISED_LISTENERS},${ADD_LISTENER}" +fi + +if [[ -n "$ADD_LISTENER" && -z "$KAFKA_ADVERTISED_LISTENERS" ]]; then + export KAFKA_ADVERTISED_LISTENERS="${ADD_LISTENER}" +fi + + +if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then + export KAFKA_BROKER_RACK=$(eval $RACK_COMMAND) +fi + +#Issue newline to config file in case there is not one already +echo -e "\n" >> $KAFKA_HOME/config/server.properties + +unset KAFKA_CREATE_TOPICS +unset KAFKA_ADVERTISED_PROTOCOL_NAME +unset KAFKA_PROTOCOL_NAME + +if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then + unset KAFKA_ADVERTISED_PORT + unset KAFKA_ADVERTISED_HOST_NAME +fi + +if [[ -n "$KAFKA_LISTENERS" ]]; then + unset KAFKA_PORT + unset KAFKA_HOST_NAME +fi + +for VAR in `env` +do + if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then + kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .` + env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"` + if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then + sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char + else + echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties + fi + fi + + if [[ $VAR =~ ^LOG4J_ ]]; then + log4j_name=`echo "$VAR" | sed -r "s/(LOG4J_.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .` + log4j_env=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"` + if egrep -q "(^|^#)$log4j_name=" $KAFKA_HOME/config/log4j.properties; then + sed -r -i "s@(^|^#)($log4j_name)=(.*)@\2=${!log4j_env}@g" $KAFKA_HOME/config/log4j.properties #note that no config values may contain an '@' char + else + echo "$log4j_name=${!log4j_env}" >> $KAFKA_HOME/config/log4j.properties + fi + fi +done + +if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then + eval $CUSTOM_INIT_SCRIPT +fi + +exec $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties diff --git a/kafka/tests/Makefile b/kafka/tests/Makefile new file mode 100644 index 0000000000..0c7b88722b --- /dev/null +++ b/kafka/tests/Makefile @@ -0,0 +1,10 @@ +SHELL=/bin/bash + + +compile_proto: + python -m grpc.tools.protoc -I ../../proto --python_out=src/ --grpc_python_out=src/ ../../proto/prediction.proto + python -m grpc.tools.protoc -I ../../proto --python_out=src/ --grpc_python_out=src/ ../../proto/seldonengine.proto + + + + diff --git a/kafka/tests/src/read_predictions.py b/kafka/tests/src/read_predictions.py new file mode 100644 index 0000000000..7f3f546544 --- /dev/null +++ b/kafka/tests/src/read_predictions.py @@ -0,0 +1,30 @@ +import os +import sys, getopt, argparse +import logging +import json +from kafka import KafkaConsumer +from kafka import TopicPartition +import prediction_pb2 as ppb + +if __name__ == '__main__': + import logging + logger = logging.getLogger() + logging.basicConfig(format='%(asctime)s : %(levelname)s : %(name)s : %(message)s', level=logging.DEBUG) + logger.setLevel(logging.INFO) + + parser = argparse.ArgumentParser(prog='read_predictions') + parser.add_argument('--kafka', help='kafka endpoint', default="localhost:9093") + parser.add_argument('--topic', help='kafka topic', required=True) + + args = parser.parse_args() + opts = vars(args) + + consumer = KafkaConsumer(client_id="py-kafka",group_id=None,bootstrap_servers=args.kafka) + partition = TopicPartition(args.topic, 0) + consumer.assign([partition]) + consumer.seek(partition, 0) + for msg in consumer: + print msg + message = ppb.PredictionRequestResponseDef() + message.ParseFromString(msg.value) + print message diff --git a/proto/prediction.proto b/proto/prediction.proto index ec6b36b7ae..3822d59029 100644 --- a/proto/prediction.proto +++ b/proto/prediction.proto @@ -104,6 +104,11 @@ message PredictionFeedbackDef { // [END Feedback] +message PredictionRequestResponseDef { + PredictionRequestDef request = 1; + PredictionResponseDef response = 2; +} + diff --git a/start-all b/start-all index 51268c07ca..7fa51f9ca3 100755 --- a/start-all +++ b/start-all @@ -8,6 +8,7 @@ STARTUP_DIR="$( cd "$( dirname "$0" )" && pwd )" cd "${STARTUP_DIR}/redis-memonly" && make start_redis cd "${STARTUP_DIR}/zookeeper-k8s" && make start_zookeeper +cd "${STARTUP_DIR}/kafka" && make start_kafka cd "${STARTUP_DIR}/cluster-manager" && make start_cluster_manager cd "${STARTUP_DIR}/api-frontend" && make start_apife diff --git a/stop-all b/stop-all index 8ab7f99d71..ea7d6155bf 100755 --- a/stop-all +++ b/stop-all @@ -8,6 +8,8 @@ STARTUP_DIR="$( cd "$( dirname "$0" )" && pwd )" cd "${STARTUP_DIR}/api-frontend" && make stop_apife cd "${STARTUP_DIR}/cluster-manager" && make stop_cluster_manager +cd "${STARTUP_DIR}/kafka" && make stop_kafka cd "${STARTUP_DIR}/zookeeper-k8s" && make stop_zookeeper cd "${STARTUP_DIR}/redis-memonly" && make stop_redis + diff --git a/tests/run_mean_deploy_test.sh b/tests/run_mean_deploy_test.sh index 807b94e6e9..c7207907b4 100755 --- a/tests/run_mean_deploy_test.sh +++ b/tests/run_mean_deploy_test.sh @@ -3,8 +3,8 @@ make cm_create_deployment until make cm_check_deployment_ready do - echo "Waiting for deployment to be ready" - sleep 1 + echo "Waiting 5 secs for deployment to be ready" + sleep 5 done sleep 10 make api_post_test