Skip to content

Commit

Permalink
added load generation service to better show load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed May 1, 2019
1 parent c54d6a0 commit e1363b1
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 10 deletions.
57 changes: 52 additions & 5 deletions client/client.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
<head>
<title>Addressing Demo</title>
<script src="rhea.js"></script>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>

<body>
To: <input type="text" id="address" style="width:20%"/> Content: <input type="text" id="request" style="width:40%"/>
To: <input type="text" id="address" style="width:20%"/> Content: <input type="text" id="request" style="width:40%" onkeydown="if (event.keyCode == 13) document.getElementById('send').click()" />
<button id="send" style="width:10%">Send</button>
<button id="clear" style="width:10%">Clear</button>
<div id="responses"></div>
Expand All @@ -21,14 +22,53 @@
input.focus();
trigger.disabled = true;

function parse_function(s, msg) {
var result = s.match(/(.+)\((.*)\)/);
if (result) {
msg.subject = result[1];
msg.application_properties = result[2].split(',').reduce(function (o, s) { var kv = s.split('='); o[kv[0]] = kv[1]; return o; }, {});
}
}
function append (txt, colour) {
var div = document.createElement("div");
div.innerHTML = txt.fontcolor(colour);
output.append(div);
input.focus();
}
function append_chart (stats) {
var div = document.createElement("div");
output.append(div);
var names = [];
var values = [];
var colours = [];
for (var colour in stats) {
for (var name in stats[colour]) {
names.push(name);
values.push(stats[colour][name]);
colours.push(colour);
}
}
var layout = {
autosize: false,
width: 200*values.length,
height: 400
};
var data = [
{
x: names,
y: values,
marker:{
color: colours
},
type: 'bar'
}
];
Plotly.plot(div, data, layout, {displayModeBar: false, staticPlot: true});
}
trigger.onclick = function () {
connection.send({to:target.value, reply_to:myaddress, body:input.value});
var message = {to:target.value, reply_to:myaddress, body:input.value};
parse_function(input.value, message);
connection.send(message);
input.value = "";
};
clear.onclick = function () {
Expand All @@ -39,17 +79,24 @@

var client = require("rhea");
client.on("message", function (context) {
var colour = context.message.application_properties.colour.toString();
append(context.message.body + " <i>from " + context.message.reply_to + "</i>", colour);
var colour = context.message.application_properties && context.message.application_properties.colour ? context.message.application_properties.colour.toString() : 'black';
if (context.message.subject === 'stats' || typeof context.message.body === 'object' ) {
append_chart(context.message.body);
} else {
append(context.message.body + " <i>from " + context.message.reply_to + "</i>", colour);
}
});
var ws = client.websocket_connect(WebSocket);
var connection = client.connect({"connection_details":ws(server, ["binary", "AMQPWSB10", "amqp"]), "reconnect":false});
var connection = client.connect({"connection_details":ws(server, ["binary", "AMQPWSB10", "amqp"]), "reconnect":false, "container_id":"demo-client"});
var myaddress;
connection.open_receiver({source:{dynamic:true}});
client.on('receiver_open', function (context) {
myaddress = context.receiver.source.address;
trigger.disabled = false;
});
client.on('connection_open', function (context) {
document.title = 'Connected to ' + context.connection.container_id;
});

</script>
</body>
Expand Down
27 changes: 27 additions & 0 deletions deploy/loadgen/loadgen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: loadgen
spec:
replicas: 1
selector:
matchLabels:
application: loadgen
template:
metadata:
creationTimestamp: null
labels:
application: loadgen
spec:
containers:
- image: quay.io/gordons/address-demo-loadgen
imagePullPolicy: IfNotPresent
name: upper
volumeMounts:
- name: connect
mountPath: "/etc/messaging/"
readOnly: true
volumes:
- name: connect
secret:
secretName: connect-config
10 changes: 5 additions & 5 deletions deploy/service/yellow.yaml → deploy/service/pink.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: yellow-reverse
name: pink-reverse
spec:
replicas: 2
replicas: 1
selector:
matchLabels:
application: yellow-reverse
application: pink-reverse
template:
metadata:
creationTimestamp: null
labels:
application: yellow-reverse
application: pink-reverse
spec:
containers:
- env:
- name: COLOUR
value: yellow
value: pink
- name: OPERATION
value: reverse
- name: GROUP
Expand Down
36 changes: 36 additions & 0 deletions deploy/service/purple.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: purple-upper
spec:
replicas: 1
selector:
matchLabels:
application: purple-upper
template:
metadata:
creationTimestamp: null
labels:
application: purple-upper
spec:
containers:
- env:
- name: COLOUR
value: purple
- name: OPERATION
value: upper
- name: GROUP
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: quay.io/gordons/address-demo-service
imagePullPolicy: IfNotPresent
name: upper
volumeMounts:
- name: connect
mountPath: "/etc/messaging/"
readOnly: true
volumes:
- name: connect
secret:
secretName: connect-config
5 changes: 5 additions & 0 deletions loadgen/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM fedora:28

RUN dnf install -y python-qpid-proton && dnf clean all -y
ADD loadgen /
ENTRYPOINT ["/loadgen"]
3 changes: 3 additions & 0 deletions loadgen/connect.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"scheme": "amqp"
}
122 changes: 122 additions & 0 deletions loadgen/loadgen
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python

from __future__ import print_function, unicode_literals
import optparse, os, random, string
from proton import Message, Url
from proton.handlers import MessagingHandler
from proton.reactor import Container

class Client(MessagingHandler):
def __init__(self, address, rate):
super(Client, self).__init__()
self.address = address
self.rate = int(rate) or 5
self.stats = {}

def start(self, event):
self.receiver = event.container.create_receiver(event.connection, None, dynamic=True, handler=self)

def stop(self):
self.sender.close()
self.receiver.close()

def on_timer_task(self, event):
print("timer fired (rate=%s)" % self.rate)
sent = 0
while self.sender.credit and sent < self.rate:
self.sender.send(Message(reply_to=self.receiver.remote_source.address, correlation_id="responses", body='foobarbaz'))
sent += 1
print("sent %s" % sent)
if sent >= self.rate:
print("done")
else:
print("%s is less than %s!?!"% (sent, self.rate))
print("rescheduling")
event.container.schedule(1, self)

def on_link_opened(self, event):
if event.receiver == self.receiver:
self.sender = event.container.create_sender(event.connection, self.address, handler=self)
elif event.sender == self.sender:
event.container.schedule(1, self)

def increment_stats(self, index, key):
if not self.stats.has_key(index):
self.stats[index] = {}
if self.stats[index].has_key(key):
self.stats[index][key] += 1
else:
self.stats[index][key] = 1

def on_message(self, event):
if event.message.properties and "colour" in event.message.properties:
self.increment_stats(event.message.properties["colour"], event.message.reply_to)

class Service(MessagingHandler):
def __init__(self):
super(Service, self).__init__()
self.name = os.getenv("NAME", os.getenv("HOSTNAME"))
self.addresses = [self.name, "loadgen", "all"]
self.client = None
self.default_target = os.getenv("DEFAULT_TARGET", "upper")

def on_start(self, event):
event.container.container_id = self.name
conn = event.container.connect()
for address in set(self.addresses):
event.container.create_receiver(conn, address)
print("Listening on %s" % address)
self.sender = event.container.create_sender(conn, None)

def start(self, context, address, rate):
if self.client:
return False;
else:
self.client = Client(address, rate)
self.client.start(context)
return True

def stop(self):
if self.client:
self.client.stop()
self.client = None
return True;
else:
return False

def get_stats(self):
if self.client:
return self.client.stats
else:
return {}

def on_message(self, event):
reply = Message(address=event.message.reply_to,
reply_to=self.name,
correlation_id=event.message.correlation_id)

if event.message.subject == "start":
print('start requested: %s', event.message.properties)
if self.start(event, event.message.properties.get("target", self.default_target), event.message.properties.get("rate", 5)):
reply.body = "started"
else:
reply.body = "already running"
elif event.message.subject == "stop":
if self.stop():
reply.body = "stopped"
else:
reply.body = "not running"
elif event.message.subject == "get_stats":
reply.subject = "stats"
reply.body = self.get_stats()
else:
reply.body = event.message.body

self.sender.send(reply)

try:
Container(Service()).run()
except KeyboardInterrupt: pass



0 comments on commit e1363b1

Please sign in to comment.