In phase 1 I have implemented the simple webserver with routings . to receive device events and write these to Kafka and . to return charging state on request, some simple stream processing receiving device events from Kafka, and an in-memory data store for the charging state of the devices.
The data fields used in the various stages of processing are defined in the table below. From this table the following code is generated:
. The device event Pojo, . The Avro schema used in Kafka, . (Later) The Dao code used to store the device event data in an RDBMS, . The charging state Pojo.
Right now, the stream processing is simply extracting the charging state data from received device events and storing them in the in-memory store.
The in-memory store uses a concurrent hashmap, ensuring proper handling of multi-threaded calls.
The following table specifies the fields in the data that are received from the IoT device and are processed by our server.
| Field name | Avro type | Avro nullable | Java type | Where |
|---|---|---|---|---|
| device_id | string | t | String | depojo avro db |
| charging | int | T | int | depojo avro db cspojo |
| charging_source | string | T | String | depojo avro db cspojo |
| current_capacity | int | y | int | depojo avro db cspojo |
| moduleL_temp | int | y | int | depojo avro db |
| moduleR_temp | int | y | int | depojo avro db |
| processor1_temp | int | y | int | depojo avro db |
| processor2_temp | int | y | int | depojo avro db |
| processor3_temp | int | y | int | depojo avro db |
| processor4_temp | int | y | int | depojo avro db |
| inverter_state | int | y | int | depojo avro db |
| soc_regulator | float | y | float | depojo avro db |
| received_when | long | T | long | avro db |
| id | int | db |
(defun in-depojo-p (where)
"Return T when 'depojo' occurs in where."
(string-match-p (regexp-quote "depojo") where))
(defun in-avro-p (where)
"Return T when 'avro' occurs in where."
(string-match-p (regexp-quote "avro") where))
(defun in-db-p (where)
"Return T when 'db' occurs in where."
(string-match-p (regexp-quote "db") where))
(defun in-cspojo-p (where)
"Return T when 'cspojo' occurs in where."
(string-match-p (regexp-quote "cspojo") where))
(defun fieldname-col (row)
"Return the value from the Field name column in row."
(nth 0 row))
(defun avrotype-col (row)
"Return the value from the Avro type column in row."
(nth 1 row))
(defun avronullable-p (row)
"Return T when the value from the Avro nullable column in row contains T t Y or y."
(let ((ns (nth 2 row)))
(if (string-match-p "[TtYy]" ns)
ns
nil
)
))
(defun javatype-col (row)
"Return the value from the Jave type column in row."
(nth 3 row))
(defun where-col (row)
"Return the value from the Where column in row."
(nth 4 row))
(defun capfirst (text)
"Uppercase the first character in text."
(let ((frst (substring text 0 1))
(rest (substring text 1 nil)))
(concat
(upcase frst)
rest))) (defun deviceevent-pojo-fields (datarows)
"Generate the private data fields in the Pojo."
(let ((output ""))
(dotimes (i (length datarows) output)
(let ((rowi (nth i datarows)))
(let ((fieldname (fieldname-col rowi))
(javatype (javatype-col rowi))
(inpojo (in-depojo-p (where-col rowi))))
(if inpojo
(setq output
(concat
output
" private " javatype " " fieldname ";\n"
" @JsonProperty\n"
" public " javatype " get" (capfirst fieldname) "() {\n"
" return " fieldname ";\n"
" }\n\n"
))))))
output
))
(defun deviceevent-pojo-getters (datarows)
"Generate the Getters in the Pojo."
""
)
(defun generate-deviceevent-pojo (datarows)
(let ((depojo-output
(concat
"package com.example.ingestbattevents.api;\n"
"import com.fasterxml.jackson.annotation.JsonProperty;\n"
"\n"
"public class DeviceEvent {\n"
"\n"
" public DeviceEvent() {\n"
" // Jackson deserialization\n"
" }\n"
"\n"
(deviceevent-pojo-fields datarows)
"}\n"
)))
(with-temp-buffer
(let ((outfile "scaffold/src/main/java/api/DeviceEvent.java"))
(insert depojo-output)
(when (file-writable-p outfile)
(write-region (point-min)
(point-max)
outfile)))))) (defun deviceevent-avroschema-default (avrotype)
"Return the default value for the given AVRO type."
(cond
((string= avrotype "boolean") "false")
((string= avrotype "int" ) "0" )
((string= avrotype "long" ) "0" )
((string= avrotype "float" ) "0.0" )
((string= avrotype "double" ) "0.0" )
((string= avrotype "string" ) "\"\"" )
(t nil )
)
)
(defun deviceevent-avroschema-type (avrotype nullable)
"Return the type to be used in the AVRO schema."
(let ((atp (concat "\"" avrotype "\"")))
(if nullable
(concat "[" atp ", \"null\"]")
atp
)))
(defun deviceevent-avroschema-keyvalue (key value)
"Return a JSON formatted key-value pair."
(concat "\"" key "\": " value)
)
(defun deviceevent-avroschema-fields (datarows)
"Generate the data fields part of the schema."
(let ((output ""))
(dotimes (i (length datarows) output)
(let ((rowi (nth i datarows)))
(let ((fieldname (fieldname-col rowi))
(avrotype (avrotype-col rowi))
(nullable (avronullable-p rowi))
(inavro (in-avro-p (where-col rowi))))
(if inavro
(setq output
(concat
output
(if (> (length output) 0)
" ,"
" ")
"{\n"
" \"name\": \"" fieldname "\",\n"
;;" \"type\": [\"string\", \"null\"],\n"
;;" \"type\": " avrotype ",\n"
" " (deviceevent-avroschema-keyvalue "type" (deviceevent-avroschema-type avrotype nullable)) ",\n"
" " (deviceevent-avroschema-keyvalue "default" (deviceevent-avroschema-default avrotype)) "\n"
" }\n"
;; " private " javatype " " fieldname ";\n"
;; " @JsonProperty\n"
;; " public " javatype " get" (capfirst fieldname) "() {\n"
;; " return " fieldname ";\n"
;; " }\n\n"
))))))
output
))
(defun generate-deviceevent-avroschema (datarows)
(let ((avroschema-output
(concat
"{\n"
" \"namespace\": \"com.example.ingestbattevents.avro\",\n"
" \"type\": \"record\",\n"
" \"name\": \"DeviceEventAvro\",\n"
" \"fields\": [\n"
(deviceevent-avroschema-fields datarows)
;; " {\n"
;; " \"name\": \"device_id\",\n"
;; " \"type\": [\"string\", \"null\"],\n"
;; " \"default\": \"\"\n"
;; " }\n"
;; " ,{\n"
;; " \"name\": \"charging\",\n"
;; " \"type\": [\"int\",\"null\"],\n"
;; " \"default\": 0\n"
;; " }\n"
;; " ,{\n"
;; " \"name\": \"charging_source\",\n"
;; " \"type\": [\"string\", \"null\"],\n"
;; " \"default\": \"\"\n"
;; " }\n"
;; " ,{\n"
;; " \"name\": \"current_capacity\",\n"
;; " \"type\": [\"int\",\"null\"],\n"
;; " \"default\": 0\n"
;; " }\n"
" ]\n"
"}\n"
)))
(with-temp-buffer
(let ((outfile "scaffold/src/main/resources/avro/deviceevent.avsc"))
(insert avroschema-output)
(when (file-writable-p outfile)
(write-region (point-min)
(point-max)
outfile)))))) (defun generate-deviceevent-dbimodule (datarows)
(let ((dbimodule-output
(concat
"// =====================\n"
"// DeviceEvent Dbi module\n"
"// =====================\n"
"\n"
)))
(with-temp-buffer
(let ((outfile "scaffold/src/main/java/dbi/DeviceEvent.java"))
(insert dbimodule-output)
(when (file-writable-p outfile)
(write-region (point-min)
(point-max)
outfile)))))) (defun charging-state-pojo-ctor-args (datarows)
"Generate the arguments for the constructor in the Charging State Pojo."
(let ((output ""))
(dotimes (i (length datarows) output)
(let ((rowi (nth i datarows)))
(let ((fieldname (fieldname-col rowi))
(javatype (javatype-col rowi))
(inpojo (in-cspojo-p (where-col rowi))))
(if inpojo
(setq output
(concat
output
(if (not (string= "" output))
", ")
javatype " " fieldname))
))))
output
))
(defun charging-state-pojo-ctor-assis (datarows)
"Generate the constructor in the Charging State Pojo."
""
(let ((output ""))
(dotimes (i (length datarows) output)
(let ((rowi (nth i datarows)))
(let ((fieldname (fieldname-col rowi))
(javatype (javatype-col rowi))
(inpojo (in-cspojo-p (where-col rowi))))
(if inpojo
(setq output
(concat
output
" this." fieldname " = " fieldname ";\n"
))))))
output
))
(defun charging-state-pojo-fields (datarows)
"Generate the private data fields in the Charging State Pojo."
(let ((output ""))
(dotimes (i (length datarows) output)
(let ((rowi (nth i datarows)))
(let ((fieldname (fieldname-col rowi))
(javatype (javatype-col rowi))
(inpojo (in-cspojo-p (where-col rowi))))
(if inpojo
(setq output
(concat
output
" private " javatype " " fieldname ";\n"
" @JsonProperty\n"
" public " javatype " get" (capfirst fieldname) "() {\n"
" return " fieldname ";\n"
" }\n\n"
))))))
output
))
(defun generate-charging-state-pojo (datarows)
(let ((cspojo-output
(concat
"package api;\n"
"import com.fasterxml.jackson.annotation.JsonProperty;\n"
"\n"
"public class ChargingState {\n"
"\n"
" public ChargingState (" (charging-state-pojo-ctor-args datarows) ") {\n"
(charging-state-pojo-ctor-assis datarows)
" }\n\n"
(charging-state-pojo-fields datarows)
"}\n"
)))
(with-temp-buffer
(let ((outfile "scaffold/src/main/java/api/ChargingState.java"))
(insert cspojo-output)
(when (file-writable-p outfile)
(write-region (point-min)
(point-max)
outfile)))))) (let ((datarows (cdr tbl)))
(progn (generate-deviceevent-pojo datarows)
(generate-deviceevent-avroschema datarows)
;;(generate-deviceevent-dbimodule datarows)
(generate-charging-state-pojo datarows)
))Source File EventGenerator.java contains a hashmap with possible event fields:
(...)
// setup event field values
{
events.put("charging", new ImmutablePair<>(-1000, 1000));
events.put("charging_source", new ImmutablePair<>("solar", "utility"));
events.put("current_capacity", new ImmutablePair<>(0, 13_000));
// other fields like a real device would send
// events.put("moduleL_temp", new ImmutablePair<>(-5, 225));
// events.put("moduleR_temp", new ImmutablePair<>(-5, 225));
// events.put("processor1_temp", new ImmutablePair<>(-5, 225));
// events.put("processor2_temp", new ImmutablePair<>(-5, 225));
// events.put("processor3_temp", new ImmutablePair<>(-5, 225));
// events.put("processor4_temp", new ImmutablePair<>(-5, 225));
// events.put("inverter_state", new ImmutablePair<>(0, 15));
// events.put("SoC_regulator", new ImmutablePair<>(26.0f, 29.6f));
}
(...)The generator generates a random number of events with random field values. The event data is then formatted as a JSON array and POST-ed to the web-server.
$ cd (…)/event-generators $ mvn clean/compile/package $ java -jar target/event-generators-1.2-SNAPSHOT-jar-with-dependencies.jar events -e 1 –debug -t http://localhost:8080/device-events
#! /bin/bash java -jar target/event-generators-1.2-SNAPSHOT-jar-with-dependencies.jar events -e 1 –debug -t http://localhost:8080/device-events
File DeviceEventResource.java sets up the URL /device-events/{devid} for receiving device events with charging and device data for device with id devid. This handler expects an JSON array of device event records that are decoded into an ArrayList<DeviceEvent>.
$ cd (…)/scaffold $ mvn clean/compile/package $ java -jar target/energy-kafka-1.0-SNAPSHOT.jar server ingestbattevents.yml
#! /bin/bash java -jar target/energy-kafka-1.0-SNAPSHOT.jar server ingestbattevents.yml
File scaffold/src/main/resources/avro/devicebattevent.avsc defines the schema to use when sending/receiving device events to/from Kafka. It defines a simple record consisting of a list of fields enumerated in the tables above. This results in generated Java code in file scaffold/src/main/generated/com/example/ingestbattevents/avro/DeviceEventAvro.java that can be used for this purpose.
This is a full producer example. The data is actually written to Kafka in function sendDeviceEventToKafka in file scaffold/src/main/java/DeviceEventResource.java.
install Docker
Post-installation steps for Linux
bert@bert-K18Base:~$ sudo groupadd docker
groupadd: group ‘docker’ already exists
bert@bert-K18Base:~$ sudo usermod -aG docker
$ cd (…)/scaffold $ docker-compose -f docker-compose-kafka.yml up $# runs at localhost:29092, schema registry at localhost:8090
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ curl –silent -X GET http://localhost:8090/subjects/ | jq . [ “device-events-value” ] bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ curl –silent -X GET http://localhost:8090/subjects/device-events-value/versions/latest | jq . { “subject”: “device-events-value”, “version”: 1, “id”: 41, “schema”: “{"type":"record","name":"DeviceEventAvro","namespace":"com.example.ingestbattevents.avro","fields":[{"name":"device_id","type":["string","null"],"default":""},{"name":"charging","type":["int","null"],"default":0},{"name":"charging_source","type":["string","null"],"default":""},{"name":"current_capacity","type":["int","null"],"default":0}]}” }
#! /bin/bash cd scaffold docker-compose -f docker-compose-kafka.yml up
streams.StreamsConfiguration.java#schemaRegistry returns a map containing the URL of the schema resistry.
Look at this example how they use Avro Schema’s and serialisation/deserialisation. folder: KafkaStreamsTutorials/kafka-streams-examples file -> KafkaStreamsTutorials/kafka-streams-examples/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java
streams.DeviceEventProcessing.java contains the setup of the device event processing topology. This also implements the DropWizard Managed interface, so that the stream can be started/stopped when the webserver starts/stops. This also calls on StreamsConfiguration to find out where the Kafka Brokers are and where the Schema Resistry can be found.
-> (…)/scaffold/README.md -> Dropwizard MySQL Integration Tutorial -> SQL AUTO INCREMENT Field
$ docker run –name manning-postgres -e POSTGRES_PASSWORD=secret -p 5432:5432 -d postgres:12.2 $ docker stop manning-postgres $ docker rm manning-postgres
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 4b5c7433998c postgres:12.2 “docker-entrypoint.s…” 8 minutes ago Up 8 minutes 0.0.0.0:5432->5432/tcp manning-postgres
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ docker exec -it manning-postgres psql -U postgres -c “SELECT schema_name FROM information_schema.schemata” psql: warning: extra command-line argument “FROM” ignored psql: warning: extra command-line argument “information_schema.schemata”” ignored psql: error: could not connect to server: FATAL: database “schema_name” does not exist
Create a new table in the public database:
bash $ docker exec -it manning-postgres psql -U postgres \ -c “CREATE TABLE devices (uuid varchar, state boolean)”
bash $ docker exec -it manning-postgres psql -U postgres -c ‘\dt’ List of relations Schema | Name | Type | Owner --------+---------+-------+---------- public | devices | table | postgres (1 row)
allowing to actually connect to the database
Add to ingestbattevents.yml
database: driverClass: org.postgresql.Driver user: postgres password: secret url: “jdbc:postgresql://0.0.0.0:5432/postgres”
$ nix-env -iA nixpkgs.mysql-workbench
$ mysql-workbench
CREATE TABLE device_events ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY ,charging INT ,charging_source VARCHAR(50) ,current_capacity INT );
$ docker run –name manning-mysql -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 -d mysql:8.0.22 $ docker stop manning-mysql $ docker rm manning-mysql
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ docker run –name manning-mysql -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 -d mysql:8.0.22 Unable to find image ‘mysql:8.0.22’ locally 8.0.22: Pulling from library/mysql 852e50cd189d: Pull complete 29969ddb0ffb: Pull complete a43f41a44c48: Pull complete 5cdd802543a3: Pull complete b79b040de953: Pull complete 938c64119969: Pull complete 7689ec51a0d9: Pull complete a880ba7c411f: Pull complete 984f656ec6ca: Pull complete 9f497bce458a: Pull complete b9940f97694b: Pull complete 2f069358dc96: Pull complete Digest: sha256:4bb2e81a40e9d0d59bd8e3dc2ba5e1f2197696f6de39a91e90798dd27299b093 Status: Downloaded newer image for mysql:8.0.22 a97b4bb397956ff8f30da99c4d5e87d70a07bc7a693aebc884dc719a6393d94a
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a97b4bb39795 mysql:8.0.22 “docker-entrypoint.s…” 35 seconds ago Up 33 seconds 0.0.0.0:3306->3306/tcp, 33060/tcp manning-mysql
bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$ docker exec -it manning-mysql mysql -P 3306 -u root –password=secret mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 8 Server version: 8.0.22 MySQL Community Server - GPL
Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners.
Type ‘help;’ or ‘\h’ for help. Type ‘\c’ to clear the current input statement.
mysql> \h
For information about MySQL products and services, visit: http://www.mysql.com/ For developer information, including the MySQL Reference Manual, visit: http://dev.mysql.com/ To buy MySQL Enterprise support, training, or other products, visit: https://shop.mysql.com/
List of all MySQL commands: Note that all text commands must be first on line and end with ‘;’ ? (\?) Synonym for `help’. clear (\c) Clear the current input statement. connect (\r) Reconnect to the server. Optional arguments are db and host. delimiter (\d) Set statement delimiter. edit (\e) Edit command with $EDITOR. ego (\G) Send command to mysql server, display result vertically. exit (\q) Exit mysql. Same as quit. go (\g) Send command to mysql server. help (\h) Display this help. nopager (\n) Disable pager, print to stdout. notee (\t) Don’t write into outfile. pager (\P) Set PAGER [to_pager]. Print the query results via PAGER. print (\p) Print current command. prompt (\R) Change your mysql prompt. quit (\q) Quit mysql. rehash (\#) Rebuild completion hash. source (.) Execute an SQL script file. Takes a file name as an argument. status (\s) Get status information from the server. system (\!) Execute a system shell command. tee (\T) Set outfile [to_outfile]. Append everything into given outfile. use (\u) Use another database. Takes database name as argument. charset (\C) Switch to another charset. Might be needed for processing binlog with multi-byte charsets. warnings (\W) Show warnings after every statement. nowarning (\w) Don’t show warnings after every statement. resetconnection(\x) Clean session context.
For server side help, type ‘help contents’
mysql> \q Bye bert@bert-K18Base:~/DistributedGridProject/manning-energy-resources/scaffold$
#! /bin/bash docker run –name manning-mysql -e MYSQL_ROOT_PASSWORD=secret -p 3306:3306 mysql:8.0.22
$ docker exec -it manning-mysql mysql -P 3306 -u root –password=secret
Enable the mysql dependency in the pom, adding the client libraries to the project, allowing you to actually connect to the database
Add to ingestbattevents.yml
database: driverClass: com.mysql.cj.jdbc.Driver user: root password: secret url: “jdbc:mysql://0.0.0.0:3306/information_schema”
-> Class ConcurrentHashMap<K,V> See files scaffold/src/main/java/api/ChargingState.java and scaffold/src/main/java/dbi/ChargingStateStore.java.
Dropwizard Core DropWizard Managed Objects Dropwizard Tutorial – Hello World Example Irontest example
Apache Avro™ 1.10.0 Getting Started (Java) Docs Build Applications for Kafka -> Schema Management (Confluent) Schema Registry Tutorials / On-Premises Schema Registry Tutorial (Confluent)
MySQL Documentation DBeaver Community Edition 7.3.1 MySQL Workbench