Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ShimmerBLEGrpc/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ Then, run ShimmerGRPC.java
# Changelog
v1.0.0
- Initial Release

v1.0.1
- Improve logging in ShimmerServer
- Improve thread safety in ShimmerBLEService
135 changes: 81 additions & 54 deletions ShimmerBLEGrpc/Sources/ShimmerBLEService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf

@MainActor
final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleServiceProtocol {

private var centralManager: CBCentralManager?
Expand All @@ -27,7 +28,7 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService
private var radioMap = [String: BleByteRadio]()

init() {
self.centralManager = CBCentralManager()
self.centralManager = CBCentralManager() // main queue by default
self.bluetoothManager = BluetoothManager(centralmanager: self.centralManager!)
bluetoothManager?.delegate = self
}
Expand All @@ -39,7 +40,13 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService
}

func writeBytesShimmer(request: ShimmerBLEGRPC_WriteBytes, context: GRPCCore.ServerContext) async throws -> ShimmerBLEGRPC_Reply {
radioMap[request.address]!.writeData(data: request.byteToWrite)
//Avoid force-unwrap and handle when device not connected
guard let radio = radioMap[request.address] else {
return ShimmerBLEGRPC_Reply.with {
$0.message = "Write failed: device \( request.address) not connected"
}
}
radio.writeData(data: request.byteToWrite)
return ShimmerBLEGRPC_Reply.with {
$0.message = "Written " + request.address
}
Expand All @@ -55,24 +62,33 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService

//Initiates a BLE scan first, before completing the process in startConnectShimmer()
func connectShimmer(request: ShimmerBLEGRPC_Request, response: GRPCCore.RPCWriter<ShimmerBLEGRPC_StateStatus>, context: GRPCCore.ServerContext) async throws {
if(!isConnecting) {
if !isConnecting {
deviceNameToConnect = request.name
isConnecting = true
print("Received connectShimmer request for: " + deviceNameToConnect)
var res = self.bluetoothManager?.startScanning(deviceName: self.deviceNameToConnect, timeout: 3)

// register the writer up front
connectStreamMap[deviceNameToConnect] = response
await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.connecting, message: "Connecting")

await writeStatusResponse(deviceName: deviceNameToConnect,
state: ShimmerBLEGRPC_BluetoothState.connecting,
message: "Connecting")

_ = bluetoothManager?.startScanning(deviceName: self.deviceNameToConnect, timeout: 3)

try await Task.sleep(for: .seconds(4))
while(bluetoothDeviceMap.keys.contains(deviceNameToConnect)) {
//this keeps the response GRPCCore.RPCWriter<> open
// Keep the writer open while the device stays connected (exists in map)
while bluetoothDeviceMap[deviceNameToConnect] != nil {
try await Task.sleep(for: .seconds(0.1)) //sleep 100ms
}
} else {
print("Received connectShimmer request for: " + deviceNameToConnect)
print("Error: connection attempt already in progress!")
await writeStatusResponseWithRPCWriter(state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Connection failed! Existing connection attempt in progress", writer: response)
await writeStatusResponseWithRPCWriter(
state: ShimmerBLEGRPC_BluetoothState.disconnected,
message: "Connection failed! Existing connection attempt in progress",
writer: response
)
}
}

Expand All @@ -82,16 +98,15 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService
}

private func writeStatusResponseWithRPCWriter(state: ShimmerBLEGRPC_BluetoothState, message: String, writer: GRPCCore.RPCWriter<ShimmerBLEGRPC_StateStatus>?) async {
if(writer != nil) {
let status = ShimmerBLEGRPC_StateStatus.with {
$0.state = state
$0.message = message
}
do {
try await writer?.write(status)
} catch let error {
print(error)
}
guard let writer = writer else { return }
let status = ShimmerBLEGRPC_StateStatus.with {
$0.state = state
$0.message = message
}
do {
try await writer.write(status)
} catch {
print("writeStatusResponse error:", error)
}
}

Expand All @@ -108,52 +123,64 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService

func getDataStream(request: ShimmerBLEGRPC_StreamRequest, response: GRPCCore.RPCWriter<ShimmerBLEGRPC_ObjectClusterByteArray>, context: GRPCCore.ServerContext) async throws {
print("Received getDataStream request for: " + request.message)
while(bluetoothDeviceMap.keys.contains(request.message)) {
if(queueMap.keys.contains(request.message)) {
while bluetoothDeviceMap[request.message] != nil {
if let q = queueMap[request.message] {
var data = Data()
while(!queueMap[request.message]!.isEmpty) {
data.append(queueMap[request.message]?.dequeue() ?? Data())
while !q.isEmpty {
data.append(q.dequeue() ?? Data())
}

let res = ShimmerBLEGRPC_ObjectClusterByteArray.with {
$0.bluetoothAddress = request.message
$0.binaryData = data
$0.calibratedTimeStamp = Double(Date().timeIntervalSince1970 * 1_000) //Unix timestamp in milliseconds

if !data.isEmpty {
let res = ShimmerBLEGRPC_ObjectClusterByteArray.with {
$0.bluetoothAddress = request.message
$0.binaryData = data
$0.calibratedTimeStamp = Double(Date().timeIntervalSince1970 * 1_000) //Unix timestamp in milliseconds
}
try await response.write(res)
}
try await response.write(res)
}

try await Task.sleep(for: .seconds(0.001)) //sleep 1ms
}

}

func startConnectShimmer() async {
let peripheral = bluetoothManager?.getPeripheral(deviceName: deviceNameToConnect)
if(peripheral != nil) {
//Always reset isConnecting at the end
defer { isConnecting = false }

guard let peripheral = bluetoothManager?.getPeripheral(deviceName: deviceNameToConnect) else {
await writeStatusResponse(deviceName: deviceNameToConnect,
state: ShimmerBLEGRPC_BluetoothState.disconnected,
message: "Failed to discover device")
return
}

//Create radio first, connect, and only after that register in maps
let radio = BleByteRadio(deviceName: deviceNameToConnect,
cbperipheral: peripheral,
bluetoothManager: bluetoothManager!)
radio.delegate = self

let success = await radio.connect()
if success ?? false {
bluetoothDeviceMap[deviceNameToConnect] = peripheral
queueMap[deviceNameToConnect] = ConcurrentQueue<Data>()

let radio = BleByteRadio(deviceName: deviceNameToConnect,cbperipheral: peripheral!,bluetoothManager: bluetoothManager!)
radio.delegate = self

let success = await radio.connect()
if(success ?? false) {
radioMap[deviceNameToConnect] = radio
await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.connected, message: "Success")
} else {
await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Radio failed to connect")
}
radioMap[deviceNameToConnect] = radio
await writeStatusResponse(deviceName: deviceNameToConnect,
state: ShimmerBLEGRPC_BluetoothState.connected,
message: "Success")
} else {
await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Failed to discover device")
await writeStatusResponse(deviceName: deviceNameToConnect,
state: ShimmerBLEGRPC_BluetoothState.disconnected,
message: "Radio failed to connect")
}

isConnecting = false
}

func startDisconnectShimmer(name: String) {
Task {
await radioMap[name]?.disconnect()
//Run the disconnect and cleanup on the main actor
Task { @MainActor in
if let radio = radioMap[name] {
await radio.disconnect()
}
bluetoothDeviceMap.removeValue(forKey: name)
connectStreamMap.removeValue(forKey: name)
queueMap.removeValue(forKey: name)
Expand All @@ -165,9 +192,9 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService

extension ShimmerBLEService : BluetoothManagerDelegate {
func scanCompleted() {
if(isConnecting) {
if isConnecting {
Task {
await startConnectShimmer()
await startConnectShimmer() // will hop onto the main actor
}
}
}
Expand All @@ -189,9 +216,9 @@ extension ShimmerBLEService : ByteCommunicationDelegate {
}

func byteCommunicationDataReceived(data: Data?, deviceName: String) {
let queue = queueMap[deviceName]
if(data != nil) {
queue?.enqueue(data ?? Data())
}
//Avoid optional chaining and ignore nil data
guard let data = data, let queue = queueMap[deviceName] else { return }
queue.enqueue(data)
}
}

26 changes: 24 additions & 2 deletions ShimmerBLEGrpc/Sources/ShimmerServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import ArgumentParser
import GRPCCore
import GRPCNIOTransportHTTP2
import GRPCProtobuf
import Foundation
import Darwin


@main
struct ShimmerServer: AsyncParsableCommand {
Expand All @@ -18,7 +21,12 @@ struct ShimmerServer: AsyncParsableCommand {
var port: Int = 50052

func run() async throws {
let server = GRPCServer(
// Disable buffering so logs aren’t batched when stdout/stderr are pipes
setbuf(stdout, nil)
setbuf(stderr, nil)
installSignalHandlers()

let server = await GRPCServer(
transport: .http2NIOPosix(
address: .ipv4(host: "127.0.0.1", port: self.port),
transportSecurity: .plaintext
Expand All @@ -30,8 +38,22 @@ struct ShimmerServer: AsyncParsableCommand {
group.addTask { try await server.serve() }
if let address = try await server.listeningAddress {
print("Shimmer BLE gRPC listening on \(address)")
print("Server Version: v1.0.0")
print("Server Version: v1.0.1")
}
}
}

// Log termination signals to help diagnose unexpected exits
private func installSignalHandlers() {
let signals = [SIGTERM, SIGHUP, SIGINT, SIGQUIT]
for sig in signals {
signal(sig) { s in
let msg = "[Swift] received signal \( s)\n"
FileHandle.standardError.write(Data(msg.utf8))
// Ensure logs are flushed before exiting
fflush(stdout); fflush(stderr)
_exit(128 + s) // Conventional exit for signaled process
}
}
}
}
2 changes: 1 addition & 1 deletion ShimmerBLEGrpc/Sources/Subcommands/Serve.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct Serve: AsyncParsableCommand {
var port: Int = 50052

func run() async throws {
let server = GRPCServer(
let server = await GRPCServer(
transport: .http2NIOPosix(
address: .ipv4(host: "127.0.0.1", port: self.port),
transportSecurity: .plaintext
Expand Down
8 changes: 4 additions & 4 deletions ShimmerBluetooth/ShimmerBluetooth/BleByteRadio.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ public class BleByteRadio : NSObject, ByteCommunication {
public func writeBytes(bytes: [UInt8])->Bool {
let data = Data(bytes)
guard let char = self.characteristics[RBL_CHAR_TX_UUID] else { return false}
print("Write Data \(bytes)")
print(char.uuid.uuidString)
// print("Write Data \(bytes)")
// print(char.uuid.uuidString)
self.activePeripheral?.writeValue(data, for: char, type: .withResponse)
return true
}

public func writeData(data: Data) ->Bool {
guard let char = self.characteristics[RBL_CHAR_TX_UUID] else { return false}
print("[DEBUG] Write Data \(data)")
print(char.uuid.uuidString)
// print("[DEBUG] Write Data \(data)")
// print(char.uuid.uuidString)
self.activePeripheral?.writeValue(data, for: char, type: .withResponse)
return true
}
Expand Down