This module is the Kafka producer for the Intelligent Agent System layer of the CReATE Fleet Management System. It is designed to run on a System on Module (SoM) embedded in a vehicle, where it collects sensor and telemetry data and publishes it to Kafka. Downstream services can then consume this data for further processing and analytics.
- Reads data from serial port and virtual CAN network interface.
- Parses NMEA sentence and OBD2 CAN frames.
- Publishes data to Kafka topics.
- Supports Protocol Buffers.
- Integrates with Schema Registry.
- Pipeline concurrency.
1. Clone the repository
git clone <the repository url>
cd intelligent-agent-system2. Compile the Protocol Buffers
protoc \
--go_out=. --go_opt=module=github.com/jojohimawan/intelligent-agent-system \
--go-grpc_out=. --go-grpc_opt=module=github.com/jojohimawan/intelligent-agent-system \
api/location.proto3. Create a virtual CAN interface
sudo modprobe vcan
sudo ip link add dev vcan0 type vcan
sudo ip link set up vcan04. Verify Kafka and Schema Registry connectivity
Ensure that:
- Kafka broker is running and accessible.
- Schema Registry is up and reachable.
5. Run the server
go run cmd/server/main.go6. Test with sending CAN frame
cansend vcan0 98DAF115#04410C1770000000- Uses pipeline concurrency model with independent goroutines for reading, parsing, and publishing.
- Each stage logs recoverable errors without blocking the pipeline.
- Additional data sources or stages can be added.
- Graceful shutdown by context cancellation and Kafka/serial connection cleanup.
- Go >= 1.24
- Kafka Broker
- Confluent Schema Registry
- Serial Device (e.g. Arduino Uno, GPS module emitting NMEA sentences)
- Virtual CAN Network Interface (e.g. Linux's can-utils)
Multi-topic publishing.- Worker pool for Kafka publishing.
Graceful shutdown.VCAN connection cleanup.VCAN connection error handling.Environment variables adjustment.
Authored by Jordan Himawan.
Cyber Security Research Group, C304 - D4 Building.
Politeknik Elektronika Negeri Surabaya.Last change: December 1st, 2025.