diff --git a/ShimmerBLEGrpc/Readme.md b/ShimmerBLEGrpc/Readme.md index 3c21e6e..715ab5a 100644 --- a/ShimmerBLEGrpc/Readme.md +++ b/ShimmerBLEGrpc/Readme.md @@ -9,3 +9,7 @@ Then, run ShimmerGRPC.java # Changelog v1.0.0 - Initial Release + +v1.0.1 +- Improve logging in ShimmerServer +- Improve thread safety in ShimmerBLEService diff --git a/ShimmerBLEGrpc/Sources/ShimmerBLEService.swift b/ShimmerBLEGrpc/Sources/ShimmerBLEService.swift index 3874b6e..f1ab0bf 100644 --- a/ShimmerBLEGrpc/Sources/ShimmerBLEService.swift +++ b/ShimmerBLEGrpc/Sources/ShimmerBLEService.swift @@ -13,6 +13,7 @@ import GRPCCore import GRPCNIOTransportHTTP2 import GRPCProtobuf +@MainActor final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleServiceProtocol { private var centralManager: CBCentralManager? @@ -27,7 +28,7 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService private var radioMap = [String: BleByteRadio]() init() { - self.centralManager = CBCentralManager() + self.centralManager = CBCentralManager() // main queue by default self.bluetoothManager = BluetoothManager(centralmanager: self.centralManager!) bluetoothManager?.delegate = self } @@ -39,7 +40,13 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService } func writeBytesShimmer(request: ShimmerBLEGRPC_WriteBytes, context: GRPCCore.ServerContext) async throws -> ShimmerBLEGRPC_Reply { - radioMap[request.address]!.writeData(data: request.byteToWrite) + //Avoid force-unwrap and handle when device not connected + guard let radio = radioMap[request.address] else { + return ShimmerBLEGRPC_Reply.with { + $0.message = "Write failed: device \( request.address) not connected" + } + } + radio.writeData(data: request.byteToWrite) return ShimmerBLEGRPC_Reply.with { $0.message = "Written " + request.address } @@ -55,24 +62,33 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService //Initiates a BLE scan first, before completing the process in startConnectShimmer() func connectShimmer(request: ShimmerBLEGRPC_Request, response: GRPCCore.RPCWriter, context: GRPCCore.ServerContext) async throws { - if(!isConnecting) { + if !isConnecting { deviceNameToConnect = request.name isConnecting = true print("Received connectShimmer request for: " + deviceNameToConnect) - var res = self.bluetoothManager?.startScanning(deviceName: self.deviceNameToConnect, timeout: 3) + // register the writer up front connectStreamMap[deviceNameToConnect] = response - await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.connecting, message: "Connecting") + + await writeStatusResponse(deviceName: deviceNameToConnect, + state: ShimmerBLEGRPC_BluetoothState.connecting, + message: "Connecting") + + _ = bluetoothManager?.startScanning(deviceName: self.deviceNameToConnect, timeout: 3) try await Task.sleep(for: .seconds(4)) - while(bluetoothDeviceMap.keys.contains(deviceNameToConnect)) { - //this keeps the response GRPCCore.RPCWriter<> open + // Keep the writer open while the device stays connected (exists in map) + while bluetoothDeviceMap[deviceNameToConnect] != nil { try await Task.sleep(for: .seconds(0.1)) //sleep 100ms } } else { print("Received connectShimmer request for: " + deviceNameToConnect) print("Error: connection attempt already in progress!") - await writeStatusResponseWithRPCWriter(state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Connection failed! Existing connection attempt in progress", writer: response) + await writeStatusResponseWithRPCWriter( + state: ShimmerBLEGRPC_BluetoothState.disconnected, + message: "Connection failed! Existing connection attempt in progress", + writer: response + ) } } @@ -82,16 +98,15 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService } private func writeStatusResponseWithRPCWriter(state: ShimmerBLEGRPC_BluetoothState, message: String, writer: GRPCCore.RPCWriter?) async { - if(writer != nil) { - let status = ShimmerBLEGRPC_StateStatus.with { - $0.state = state - $0.message = message - } - do { - try await writer?.write(status) - } catch let error { - print(error) - } + guard let writer = writer else { return } + let status = ShimmerBLEGRPC_StateStatus.with { + $0.state = state + $0.message = message + } + do { + try await writer.write(status) + } catch { + print("writeStatusResponse error:", error) } } @@ -108,52 +123,64 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService func getDataStream(request: ShimmerBLEGRPC_StreamRequest, response: GRPCCore.RPCWriter, context: GRPCCore.ServerContext) async throws { print("Received getDataStream request for: " + request.message) - while(bluetoothDeviceMap.keys.contains(request.message)) { - if(queueMap.keys.contains(request.message)) { + while bluetoothDeviceMap[request.message] != nil { + if let q = queueMap[request.message] { var data = Data() - while(!queueMap[request.message]!.isEmpty) { - data.append(queueMap[request.message]?.dequeue() ?? Data()) + while !q.isEmpty { + data.append(q.dequeue() ?? Data()) } - - let res = ShimmerBLEGRPC_ObjectClusterByteArray.with { - $0.bluetoothAddress = request.message - $0.binaryData = data - $0.calibratedTimeStamp = Double(Date().timeIntervalSince1970 * 1_000) //Unix timestamp in milliseconds + + if !data.isEmpty { + let res = ShimmerBLEGRPC_ObjectClusterByteArray.with { + $0.bluetoothAddress = request.message + $0.binaryData = data + $0.calibratedTimeStamp = Double(Date().timeIntervalSince1970 * 1_000) //Unix timestamp in milliseconds + } + try await response.write(res) } - try await response.write(res) } - try await Task.sleep(for: .seconds(0.001)) //sleep 1ms } - } func startConnectShimmer() async { - let peripheral = bluetoothManager?.getPeripheral(deviceName: deviceNameToConnect) - if(peripheral != nil) { + //Always reset isConnecting at the end + defer { isConnecting = false } + + guard let peripheral = bluetoothManager?.getPeripheral(deviceName: deviceNameToConnect) else { + await writeStatusResponse(deviceName: deviceNameToConnect, + state: ShimmerBLEGRPC_BluetoothState.disconnected, + message: "Failed to discover device") + return + } + + //Create radio first, connect, and only after that register in maps + let radio = BleByteRadio(deviceName: deviceNameToConnect, + cbperipheral: peripheral, + bluetoothManager: bluetoothManager!) + radio.delegate = self + + let success = await radio.connect() + if success ?? false { bluetoothDeviceMap[deviceNameToConnect] = peripheral queueMap[deviceNameToConnect] = ConcurrentQueue() - - let radio = BleByteRadio(deviceName: deviceNameToConnect,cbperipheral: peripheral!,bluetoothManager: bluetoothManager!) - radio.delegate = self - - let success = await radio.connect() - if(success ?? false) { - radioMap[deviceNameToConnect] = radio - await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.connected, message: "Success") - } else { - await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Radio failed to connect") - } + radioMap[deviceNameToConnect] = radio + await writeStatusResponse(deviceName: deviceNameToConnect, + state: ShimmerBLEGRPC_BluetoothState.connected, + message: "Success") } else { - await writeStatusResponse(deviceName: deviceNameToConnect, state: ShimmerBLEGRPC_BluetoothState.disconnected, message: "Failed to discover device") + await writeStatusResponse(deviceName: deviceNameToConnect, + state: ShimmerBLEGRPC_BluetoothState.disconnected, + message: "Radio failed to connect") } - - isConnecting = false } func startDisconnectShimmer(name: String) { - Task { - await radioMap[name]?.disconnect() + //Run the disconnect and cleanup on the main actor + Task { @MainActor in + if let radio = radioMap[name] { + await radio.disconnect() + } bluetoothDeviceMap.removeValue(forKey: name) connectStreamMap.removeValue(forKey: name) queueMap.removeValue(forKey: name) @@ -165,9 +192,9 @@ final class ShimmerBLEService: ShimmerBLEGRPC_ShimmerBLEByteServer.SimpleService extension ShimmerBLEService : BluetoothManagerDelegate { func scanCompleted() { - if(isConnecting) { + if isConnecting { Task { - await startConnectShimmer() + await startConnectShimmer() // will hop onto the main actor } } } @@ -189,9 +216,9 @@ extension ShimmerBLEService : ByteCommunicationDelegate { } func byteCommunicationDataReceived(data: Data?, deviceName: String) { - let queue = queueMap[deviceName] - if(data != nil) { - queue?.enqueue(data ?? Data()) - } + //Avoid optional chaining and ignore nil data + guard let data = data, let queue = queueMap[deviceName] else { return } + queue.enqueue(data) } } + diff --git a/ShimmerBLEGrpc/Sources/ShimmerServer.swift b/ShimmerBLEGrpc/Sources/ShimmerServer.swift index ce44b48..7cc6a93 100644 --- a/ShimmerBLEGrpc/Sources/ShimmerServer.swift +++ b/ShimmerBLEGrpc/Sources/ShimmerServer.swift @@ -9,6 +9,9 @@ import ArgumentParser import GRPCCore import GRPCNIOTransportHTTP2 import GRPCProtobuf +import Foundation +import Darwin + @main struct ShimmerServer: AsyncParsableCommand { @@ -18,7 +21,12 @@ struct ShimmerServer: AsyncParsableCommand { var port: Int = 50052 func run() async throws { - let server = GRPCServer( + // Disable buffering so logs aren’t batched when stdout/stderr are pipes + setbuf(stdout, nil) + setbuf(stderr, nil) + installSignalHandlers() + + let server = await GRPCServer( transport: .http2NIOPosix( address: .ipv4(host: "127.0.0.1", port: self.port), transportSecurity: .plaintext @@ -30,8 +38,22 @@ struct ShimmerServer: AsyncParsableCommand { group.addTask { try await server.serve() } if let address = try await server.listeningAddress { print("Shimmer BLE gRPC listening on \(address)") - print("Server Version: v1.0.0") + print("Server Version: v1.0.1") } } } + + // Log termination signals to help diagnose unexpected exits + private func installSignalHandlers() { + let signals = [SIGTERM, SIGHUP, SIGINT, SIGQUIT] + for sig in signals { + signal(sig) { s in + let msg = "[Swift] received signal \( s)\n" + FileHandle.standardError.write(Data(msg.utf8)) + // Ensure logs are flushed before exiting + fflush(stdout); fflush(stderr) + _exit(128 + s) // Conventional exit for signaled process + } + } + } } diff --git a/ShimmerBLEGrpc/Sources/Subcommands/Serve.swift b/ShimmerBLEGrpc/Sources/Subcommands/Serve.swift index 7268705..e48a4f6 100644 --- a/ShimmerBLEGrpc/Sources/Subcommands/Serve.swift +++ b/ShimmerBLEGrpc/Sources/Subcommands/Serve.swift @@ -26,7 +26,7 @@ struct Serve: AsyncParsableCommand { var port: Int = 50052 func run() async throws { - let server = GRPCServer( + let server = await GRPCServer( transport: .http2NIOPosix( address: .ipv4(host: "127.0.0.1", port: self.port), transportSecurity: .plaintext diff --git a/ShimmerBluetooth/ShimmerBluetooth/BleByteRadio.swift b/ShimmerBluetooth/ShimmerBluetooth/BleByteRadio.swift index 1155d76..afdb54e 100644 --- a/ShimmerBluetooth/ShimmerBluetooth/BleByteRadio.swift +++ b/ShimmerBluetooth/ShimmerBluetooth/BleByteRadio.swift @@ -98,16 +98,16 @@ public class BleByteRadio : NSObject, ByteCommunication { public func writeBytes(bytes: [UInt8])->Bool { let data = Data(bytes) guard let char = self.characteristics[RBL_CHAR_TX_UUID] else { return false} - print("Write Data \(bytes)") - print(char.uuid.uuidString) +// print("Write Data \(bytes)") +// print(char.uuid.uuidString) self.activePeripheral?.writeValue(data, for: char, type: .withResponse) return true } public func writeData(data: Data) ->Bool { guard let char = self.characteristics[RBL_CHAR_TX_UUID] else { return false} - print("[DEBUG] Write Data \(data)") - print(char.uuid.uuidString) +// print("[DEBUG] Write Data \(data)") +// print(char.uuid.uuidString) self.activePeripheral?.writeValue(data, for: char, type: .withResponse) return true }