diff --git a/.gitignore b/.gitignore index a15edc62..b57f9e7a 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,7 @@ build-dir/ /lib/l10n/*.dart /untranslated-messages.txt + +# Voice server +/voice_server/voice_server +/voice_server/go.sum diff --git a/docs/VOICE_CHAT.md b/docs/VOICE_CHAT.md new file mode 100644 index 00000000..eb9f1037 --- /dev/null +++ b/docs/VOICE_CHAT.md @@ -0,0 +1,421 @@ +# Voice Chat Feature Documentation + +## Overview + +The voice chat feature adds TeamSpeak/Discord-like voice communication to Camelus. It includes: + +- Voice-only communication (for now) +- Low-latency WebRTC connections with SFU +- Multiple channels per server +- Tree-like channel structure +- Overview of users in channels +- Speaking indicators +- Mute functionality +- **Connection state monitoring with ping/latency tracking** +- **Automatic reconnection with backoff** +- **Debug info panel for troubleshooting** + +## Architecture + +The implementation follows clean architecture principles: + +### Client (Flutter) + +#### Domain Layer (`lib/domain_layer/entities/voice_chat/`) +- **VoiceUser**: Represents a user in the voice chat system + - Properties: id, npub, displayName, group, channelId, isSpeaking, isMuted +- **VoiceChannel**: Represents a voice channel + - Properties: id, name, parentId, position, userIds + - Supports tree hierarchy through parentId +- **ChannelState**: Manages the overall state + - Contains channels and users + - Helper methods for querying child channels and users in channels +- **UserGroup**: Enum for user permissions (anon, member, admin) + +#### Presentation Layer + +**Providers** (`lib/presentation_layer/providers/voice_chat/`): +- **VoiceChatProvider** (Modern Riverpod `Notifier`): Manages WebSocket connection + - Handles authentication, channel joining, state updates + - Real-time synchronization with server + - Connection status tracking (disconnected, connecting, connected, error) + - Ping/pong with latency measurement + - Automatic reconnection with exponential backoff +- **WebRTCProvider** (Modern Riverpod `Notifier`): Manages WebRTC peer connections + - Local media stream initialization + - Peer connection management + - ICE candidate handling + - Proper cleanup with `ref.onDispose()` + +**UI** (`lib/presentation_layer/routes/voice_chat/`): +- **VoiceChatPage**: Main interface + - Connection screen with server URL input + - **Color-coded connection status indicator in app bar** + - **Expandable debug info panel** (ping, latency, reconnects, errors) + - Channel tree view (left panel) + - Current channel user list (right panel) + - Speaking indicators (green dot on avatar) + - Control bar (mute, disconnect) + +### Server (Go) + +Located in `voice_server/` - **Now organized into packages**: + +#### Package Structure +``` +pkg/ +├── models/ - Data structures (Config, User, Channel, Message) +├── websocket/ - WebSocket utilities & ping/pong handlers +└── server/ - Main server logic + ├── server.go - Server struct & WebSocket handler + ├── handlers.go - Message routing & handlers + ├── broadcast.go - State updates & broadcasting + └── webrtc.go - WebRTC SFU implementation +``` + +#### Components +- **WebSocket Handler**: Manages persistent connections for API/signaling + - Proper upgrade headers and CORS + - Ping/pong keepalive (54s interval) + - Connection timeouts (60s read, 10s write) +- **WebRTC SFU**: Forwards audio tracks between users in same channel +- **Channel Manager**: Handles channel state and user assignments +- **User Manager**: Manages user states and permissions +- **Broadcaster**: Distributes state changes efficiently + +#### Technology +- **gorilla/websocket**: For WebSocket connections with compression +- **pion/webrtc**: Pure Go implementation of WebRTC for SFU +- Hybrid architecture: WebSocket for state, WebRTC for media +- ICE/STUN for NAT traversal +- Application-level ping/pong for latency tracking + +#### Configuration +Server is configured via `config.yaml`: +```yaml +server: + host: 0.0.0.0 + port: 8080 + +channels: + - id: lobby + name: Lobby + position: 0 + parent_id: null + +user_groups: + admin: + - npub1... + member: + - npub2... + anon: [] +``` + +## Getting Started + +### 1. Start the Voice Server + +```bash +cd voice_server +go run main.go -config config.yaml +``` + +The server will start on `ws://localhost:8080` by default (WebSocket endpoint). + +### 2. Configure Channels + +Edit `voice_server/config.yaml` to customize your channel structure: + +- Set unique `id` for each channel +- Use `parent_id` to create hierarchy (null for root channels) +- Order channels with `position` field +- Define user groups with Nostr npubs + +### 3. Connect from Camelus App + +1. Open Camelus app +2. Navigate to "Voice Chat" in the drawer menu +3. Enter your server URL (e.g., `ws://localhost:8080`) +4. Click "Connect" (WebSocket connection established) +5. Client creates WebRTC peer connection for audio +6. Select a channel from the tree view +7. Start talking! + +## Features + +### Channel Navigation +- Tree-like structure with parent/child relationships +- Click any channel to join +- Current channel is highlighted +- User count badge shows active users in each channel + +### User Indicators +- **Green dot**: User is speaking +- **Microphone slash icon**: User is muted +- **Avatar**: Shows first letter of display name + +### Controls +- **Mute button**: Toggle your microphone +- **Disconnect button**: Leave the voice server + +## Protocol + +### Hybrid Architecture + +The server uses two separate protocols: + +1. **WebSocket (ws://)**: For API, signaling, state management +2. **WebRTC**: For audio/video data (SFU) + +### Connection Flow + +1. **WebSocket Connection (API)** + - Client connects to `ws://server:8080/` + - Sends authentication message + - Receives initial state + - Subscribes to state updates + +2. **WebRTC Connection (Media)** + - Client creates WebRTC peer connection + - Sends offer via WebSocket: `{"type": "webrtc_offer", "sdp": {...}}` + - Receives answer via WebSocket: `{"type": "webrtc_answer", "sdp": {...}}` + - ICE candidates exchanged via WebSocket + - Audio tracks sent via WebRTC + - Server forwards audio to other users in same channel (SFU) + +### WebSocket Messages + +#### Client → Server + +**Authentication**: +```json +{ + "type": "auth", + "npub": "npub1..." +} +``` + +**Join Channel**: +```json +{ + "type": "join_channel", + "channel_id": "general" +} +``` + +**Toggle Mute**: +```json +{ + "type": "toggle_mute", + "muted": true +} +``` + +**WebRTC Offer** (for media): +```json +{ + "type": "webrtc_offer", + "sdp": {"type": "offer", "sdp": "..."} +} +``` + +**ICE Candidate**: +```json +{ + "type": "webrtc_candidate", + "candidate": {...} +} +``` + +**Speaking State**: +```json +{ + "type": "speaking", + "is_speaking": true +} +``` + +#### Server → Client + +**Initial State**: +```json +{ + "type": "state", + "data": { + "channels": [...], + "users": [...] + } +} +``` + +**WebRTC Answer** (response to offer): +```json +{ + "type": "webrtc_answer", + "sdp": {"type": "answer", "sdp": "..."} +} +``` + +**ICE Candidate**: +```json +{ + "type": "webrtc_candidate", + "candidate": {...} +} +``` + +**User Updates**: +```json +{ + "type": "user_joined|user_left|user_moved|user_speaking", + "data": {...} +} +``` + +## Security + +### User Groups +Three levels of permissions: +- **anon**: Anonymous users (no npub or unknown npub) +- **member**: Registered members (npub in member list) +- **admin**: Administrators (npub in admin list) + +Current implementation identifies users by npub, with room for future permission-based features. + +### Authentication +- Users authenticate with their Nostr npub +- Server validates npub against configured user groups +- Anonymous access allowed (as "anon" group) +- WebSocket secured with TLS (wss://) in production +- WebRTC connections encrypted by default (DTLS) + +### Network Security +- WebSocket for control plane (secure with TLS) +- WebRTC uses DTLS for encryption +- ICE/STUN for NAT traversal +- CORS configured (restrict in production) +- Consider TURN servers for production + +## Development + +### Adding New Features + +#### Client Side +1. Add new message types to `VoiceChatProvider` +2. Update UI in `VoiceChatPage` +3. Add new provider methods as needed + +#### Server Side +1. Add message handlers in `handleMessage()` +2. Implement business logic +3. Broadcast updates to clients + +### Testing Locally + +**Terminal 1** - Start server: +```bash +cd voice_server +go run main.go +``` + +**Terminal 2** - Run Flutter app: +```bash +flutter run +``` + +Connect to `ws://localhost:8080` from the app. + +## Future Enhancements + +Planned features: +- [ ] Audio streams via WebRTC (foundation ready with pion/webrtc) +- [ ] Channel permissions based on user groups +- [ ] Push-to-talk mode +- [ ] Audio quality settings +- [ ] Screen sharing +- [ ] Text chat per channel +- [ ] Channel creation/deletion +- [ ] User kick/ban for admins +- [ ] Persistent channel state +- [ ] Multiple servers management + +## Troubleshooting + +### Connection Issues +- Verify server is running: check terminal output +- Check server URL format: should start with `ws://` for WebSocket +- Ensure port is accessible (firewall, network) +- Check browser console for WebRTC errors + +### No Audio +- Grant microphone permissions to the app +- Check WebRTC implementation (currently basic structure) +- Verify browser/platform WebRTC support + +### Missing Channels +- Check `config.yaml` syntax +- Restart server after config changes +- Verify channel IDs are unique + +### User Not Showing +- Ensure WebSocket connection is established +- Check authentication message sent via WebSocket +- Look for errors in server logs +- Verify WebRTC peer connection for audio + +## Technical Details + +### State Management +- **Riverpod StateNotifier** for reactive state +- Immutable state objects with copyWith +- Efficient updates through targeted broadcasts + +### Communication Protocol +- WebSocket for signaling and state (JSON messages) +- WebRTC for media streams (audio/video) +- SFU forwards audio between users in same channel +- Encrypted by default (TLS for WebSocket, DTLS for WebRTC) + +### Channel Hierarchy +- Implemented as parent-child relationships +- Unlimited nesting depth +- Position-based ordering at each level + +### Broadcasting Strategy +- Delta updates (only what changed) +- User-specific filtering where needed +- Efficient serialization with JSON +- Data channels provide ordered, reliable delivery + +## API Reference + +### VoiceChatNotifier Methods + +```dart +Future connect(String serverUrl, String? npub) +Future disconnect() +void joinChannel(String channelId) +void toggleMute() +``` + +### ChannelState Methods + +```dart +List getChildChannels(String? parentId) +List getUsersInChannel(String channelId) +``` + +## Contributing + +When contributing to voice chat features: + +1. Follow clean architecture patterns +2. Keep state immutable +3. Add proper error handling +4. Update this documentation +5. Test with multiple clients +6. Consider low-latency requirements + +## License + +Same as the main Camelus project. diff --git a/docs/VOICE_CHAT_QUICKSTART.md b/docs/VOICE_CHAT_QUICKSTART.md new file mode 100644 index 00000000..d80632bb --- /dev/null +++ b/docs/VOICE_CHAT_QUICKSTART.md @@ -0,0 +1,153 @@ +# Voice Chat Quick Start Guide + +Get up and running with voice chat in 5 minutes! + +## 1. Start the Server + +```bash +# Navigate to voice server directory +cd voice_server + +# Copy example config (first time only) +cp config.example.yaml config.yaml + +# Start the server +go run main.go +``` + +You should see: +``` +2026/02/13 10:38:50 Voice server listening on 0.0.0.0:8080 +2026/02/13 10:38:50 WebSocket endpoint: ws://0.0.0.0:8080/ +2026/02/13 10:38:50 WebRTC SFU enabled for audio forwarding +``` + +## 2. Connect from Camelus + +1. **Open Camelus app** + - Run on your preferred platform (mobile/desktop) + +2. **Navigate to Voice Chat** + - Open the drawer menu (☰) + - Click "Voice Chat" (🎤 icon) + +3. **Connect to Server** + - Enter server URL: `ws://localhost:8080` + - Click "Connect" (WebSocket connection + WebRTC for audio) + +## 3. Join a Channel + +You'll see a channel tree like: +``` +🏠 Lobby + 💬 General +🎮 Gaming + 🎤 Voice 1 + 🎤 Voice 2 +🎵 Music + 🎧 Listening Room + 🎸 Jam Session +``` + +Click any channel to join! + +## 4. Test Voice Communication + +- **Mute/Unmute**: Click the microphone button +- **Disconnect**: Click the red phone button +- **Speaking Indicator**: Green dot appears when someone talks +- **User Count**: Badge shows number of users in each channel + +## Customizing Your Server + +Edit `voice_server/config.yaml` to: + +### Add Channels + +```yaml +channels: + - id: my-channel + name: "My Cool Channel" + position: 0 + parent_id: null # Root channel +``` + +### Set User Groups + +```yaml +user_groups: + admin: + - npub1youradminpubkeyxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + member: + - npub1yourmemberpubkeyxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx +``` + +Restart the server after config changes. + +## Troubleshooting + +**Can't connect?** +- Server running? Check the terminal +- Correct URL? Should be `ws://localhost:8080` +- Firewall? May need to allow port 8080 +- Check browser console for errors + +**No audio?** +- WebRTC SFU is ready on server side +- Check microphone permissions +- Platform WebRTC support required +- Check WebRTC connection established + +**Channel not showing?** +- Check config.yaml syntax (valid YAML) +- Restart server after changes +- Check server logs for errors + +## Next Steps + +- Read [VOICE_CHAT.md](../docs/VOICE_CHAT.md) for full documentation +- Customize channels in config.yaml +- Set up user groups with real Nostr npubs +- Deploy server for remote access + +## Remote Access + +To connect from other devices: + +1. **Get your server's IP address** + ```bash + # Linux/Mac + ifconfig | grep inet + + # Windows + ipconfig + ``` + +2. **Update config** (optional) + ```yaml + server: + host: 0.0.0.0 # Listen on all interfaces + port: 8080 + ``` + +3. **Connect from app** + - Use `ws://YOUR_IP:8080` + - Example: `ws://192.168.1.100:8080` + +4. **Production deployment** + - Use reverse proxy (nginx) + - Enable TLS: `wss://your-domain.com` + - Configure TURN servers for NAT traversal + - Configure firewall rules + +## Tips + +- **Multiple servers**: Each server can have different channels +- **Tree structure**: Nest channels up to any depth +- **User groups**: Plan your permission structure +- **Low latency**: Hybrid WebSocket + WebRTC architecture +- **Clean architecture**: Easy to extend and customize +- **SFU**: Server forwards audio between participants +- **Encrypted**: Both WebSocket (TLS) and WebRTC (DTLS) can be encrypted + +Happy chatting! 🎉 diff --git a/lib/domain_layer/entities/voice_chat/channel_state.dart b/lib/domain_layer/entities/voice_chat/channel_state.dart new file mode 100644 index 00000000..8f85461c --- /dev/null +++ b/lib/domain_layer/entities/voice_chat/channel_state.dart @@ -0,0 +1,41 @@ +import 'voice_channel.dart'; +import 'voice_user.dart'; + +class ChannelState { + final List channels; + final List users; + final String? currentUserId; + final String? currentChannelId; + + ChannelState({ + this.channels = const [], + this.users = const [], + this.currentUserId, + this.currentChannelId, + }); + + ChannelState copyWith({ + List? channels, + List? users, + String? currentUserId, + String? currentChannelId, + }) { + return ChannelState( + channels: channels ?? this.channels, + users: users ?? this.users, + currentUserId: currentUserId ?? this.currentUserId, + currentChannelId: currentChannelId ?? this.currentChannelId, + ); + } + + List getChildChannels(String? parentId) { + return channels + .where((channel) => channel.parentId == parentId) + .toList() + ..sort((a, b) => a.position.compareTo(b.position)); + } + + List getUsersInChannel(String channelId) { + return users.where((user) => user.channelId == channelId).toList(); + } +} diff --git a/lib/domain_layer/entities/voice_chat/user_group.dart b/lib/domain_layer/entities/voice_chat/user_group.dart new file mode 100644 index 00000000..30581292 --- /dev/null +++ b/lib/domain_layer/entities/voice_chat/user_group.dart @@ -0,0 +1,18 @@ +enum UserGroup { + anon, + member, + admin; + + static UserGroup fromString(String value) { + switch (value.toLowerCase()) { + case 'anon': + return UserGroup.anon; + case 'member': + return UserGroup.member; + case 'admin': + return UserGroup.admin; + default: + return UserGroup.anon; + } + } +} diff --git a/lib/domain_layer/entities/voice_chat/voice_channel.dart b/lib/domain_layer/entities/voice_chat/voice_channel.dart new file mode 100644 index 00000000..67b47f55 --- /dev/null +++ b/lib/domain_layer/entities/voice_chat/voice_channel.dart @@ -0,0 +1,31 @@ +class VoiceChannel { + final String id; + final String name; + final String? parentId; + final int position; + final List userIds; + + VoiceChannel({ + required this.id, + required this.name, + this.parentId, + required this.position, + this.userIds = const [], + }); + + VoiceChannel copyWith({ + String? id, + String? name, + String? parentId, + int? position, + List? userIds, + }) { + return VoiceChannel( + id: id ?? this.id, + name: name ?? this.name, + parentId: parentId ?? this.parentId, + position: position ?? this.position, + userIds: userIds ?? this.userIds, + ); + } +} diff --git a/lib/domain_layer/entities/voice_chat/voice_user.dart b/lib/domain_layer/entities/voice_chat/voice_user.dart new file mode 100644 index 00000000..3d80f82c --- /dev/null +++ b/lib/domain_layer/entities/voice_chat/voice_user.dart @@ -0,0 +1,41 @@ +import 'user_group.dart'; + +class VoiceUser { + final String id; + final String? npub; + final String? displayName; + final UserGroup group; + final String? channelId; + final bool isSpeaking; + final bool isMuted; + + VoiceUser({ + required this.id, + this.npub, + this.displayName, + required this.group, + this.channelId, + this.isSpeaking = false, + this.isMuted = false, + }); + + VoiceUser copyWith({ + String? id, + String? npub, + String? displayName, + UserGroup? group, + String? channelId, + bool? isSpeaking, + bool? isMuted, + }) { + return VoiceUser( + id: id ?? this.id, + npub: npub ?? this.npub, + displayName: displayName ?? this.displayName, + group: group ?? this.group, + channelId: channelId ?? this.channelId, + isSpeaking: isSpeaking ?? this.isSpeaking, + isMuted: isMuted ?? this.isMuted, + ); + } +} diff --git a/lib/presentation_layer/components/drawer/nostr_side_menu.dart b/lib/presentation_layer/components/drawer/nostr_side_menu.dart index 4ae48a7b..c5f8d39f 100644 --- a/lib/presentation_layer/components/drawer/nostr_side_menu.dart +++ b/lib/presentation_layer/components/drawer/nostr_side_menu.dart @@ -255,6 +255,14 @@ class NostrSideMenu extends ConsumerWidget { navigateToProfile(context, currentUserPubkey); }, ), + _drawerItem( + label: 'Voice Chat', + routeName: '/voice-chat', + icon: PhosphorIcons.microphoneStage(), + onTap: () { + context.push('/voice-chat'); + }, + ), _drawerItem( label: AppLocalizations.of(context)!.payments, routeName: 'payments', diff --git a/lib/presentation_layer/providers/voice_chat/voice_chat_provider.dart b/lib/presentation_layer/providers/voice_chat/voice_chat_provider.dart new file mode 100644 index 00000000..7e24d34a --- /dev/null +++ b/lib/presentation_layer/providers/voice_chat/voice_chat_provider.dart @@ -0,0 +1,373 @@ +import 'dart:async'; +import 'dart:convert'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import '../../../domain_layer/entities/voice_chat/channel_state.dart'; +import '../../../domain_layer/entities/voice_chat/voice_channel.dart'; +import '../../../domain_layer/entities/voice_chat/voice_user.dart'; +import '../../../domain_layer/entities/voice_chat/user_group.dart'; + +enum ConnectionStatus { disconnected, connecting, connected, error } + +class VoiceChatState { + final ChannelState channelState; + final ConnectionStatus connectionStatus; + final String? serverUrl; + final String? error; + final DateTime? lastPingTime; + final int? pingLatencyMs; + final int reconnectAttempts; + + VoiceChatState({ + required this.channelState, + this.connectionStatus = ConnectionStatus.disconnected, + this.serverUrl, + this.error, + this.lastPingTime, + this.pingLatencyMs, + this.reconnectAttempts = 0, + }); + + bool get isConnected => connectionStatus == ConnectionStatus.connected; + + VoiceChatState copyWith({ + ChannelState? channelState, + ConnectionStatus? connectionStatus, + String? serverUrl, + String? error, + DateTime? lastPingTime, + int? pingLatencyMs, + int? reconnectAttempts, + }) { + return VoiceChatState( + channelState: channelState ?? this.channelState, + connectionStatus: connectionStatus ?? this.connectionStatus, + serverUrl: serverUrl ?? this.serverUrl, + error: error, + lastPingTime: lastPingTime ?? this.lastPingTime, + pingLatencyMs: pingLatencyMs ?? this.pingLatencyMs, + reconnectAttempts: reconnectAttempts ?? this.reconnectAttempts, + ); + } +} + +class VoiceChatNotifier extends Notifier { + WebSocketChannel? _channel; + StreamSubscription? _subscription; + Timer? _pingTimer; + + @override + VoiceChatState build() { + ref.onDispose(() { + _cleanup(); + }); + return VoiceChatState(channelState: ChannelState()); + } + + void _cleanup() { + _pingTimer?.cancel(); + _subscription?.cancel(); + _channel?.sink.close(); + } + + Future connect(String serverUrl, String? npub) async { + try { + // Disconnect if already connected + await disconnect(); + + state = state.copyWith( + connectionStatus: ConnectionStatus.connecting, + serverUrl: serverUrl, + error: null, + ); + + // Connect to WebSocket server + final uri = Uri.parse(serverUrl); + _channel = WebSocketChannel.connect(uri); + + // Send authentication message + final authMessage = { + 'type': 'auth', + 'npub': npub, + }; + _channel!.sink.add(jsonEncode(authMessage)); + + // Listen to messages + _subscription = _channel!.stream.listen( + (message) { + _handleMessage(message); + }, + onError: (error) { + state = state.copyWith( + error: 'Connection error: $error', + connectionStatus: ConnectionStatus.error, + ); + _schedulereconnect(serverUrl, npub); + }, + onDone: () { + state = state.copyWith(connectionStatus: ConnectionStatus.disconnected); + _schedulereconnect(serverUrl, npub); + }, + ); + + state = state.copyWith( + connectionStatus: ConnectionStatus.connected, + error: null, + reconnectAttempts: 0, + ); + + // Start ping timer + _startPingTimer(); + } catch (e) { + state = state.copyWith( + error: 'Failed to connect: $e', + connectionStatus: ConnectionStatus.error, + ); + } + } + + void _startPingTimer() { + _pingTimer?.cancel(); + _pingTimer = Timer.periodic(const Duration(seconds: 30), (timer) { + if (state.isConnected && _channel != null) { + final pingTime = DateTime.now(); + final message = { + 'type': 'ping', + 'timestamp': pingTime.millisecondsSinceEpoch, + }; + try { + _channel!.sink.add(jsonEncode(message)); + } catch (e) { + // Connection might be dead + state = state.copyWith(connectionStatus: ConnectionStatus.error); + } + } + }); + } + + void _schedulereconnect(String serverUrl, String? npub) { + if (state.reconnectAttempts >= 5) { + state = state.copyWith( + error: 'Max reconnection attempts reached', + connectionStatus: ConnectionStatus.error, + ); + return; + } + + final delay = Duration(seconds: (state.reconnectAttempts + 1) * 2); + state = state.copyWith(reconnectAttempts: state.reconnectAttempts + 1); + + Timer(delay, () { + if (state.connectionStatus != ConnectionStatus.connected) { + connect(serverUrl, npub); + } + }); + } + + Future disconnect() async { + _pingTimer?.cancel(); + await _subscription?.cancel(); + await _channel?.sink.close(); + _pingTimer = null; + _subscription = null; + _channel = null; + + state = state.copyWith( + connectionStatus: ConnectionStatus.disconnected, + serverUrl: null, + reconnectAttempts: 0, + ); + } + + void _handleMessage(dynamic message) { + try { + final data = jsonDecode(message); + final type = data['type'] as String?; + + switch (type) { + case 'state': + _handleStateUpdate(data); + break; + case 'user_joined': + _handleUserJoined(data); + break; + case 'user_left': + _handleUserLeft(data); + break; + case 'user_speaking': + _handleUserSpeaking(data); + break; + case 'user_moved': + _handleUserMoved(data); + break; + case 'pong': + _handlePong(data); + break; + case 'error': + state = state.copyWith(error: data['message'] as String?); + break; + } + } catch (e) { + state = state.copyWith(error: 'Failed to parse message: $e'); + } + } + + void _handlePong(Map data) { + final timestamp = data['timestamp'] as int?; + if (timestamp != null) { + final now = DateTime.now().millisecondsSinceEpoch; + final latency = now - timestamp; + state = state.copyWith( + lastPingTime: DateTime.now(), + pingLatencyMs: latency, + ); + } + } + + void _handleStateUpdate(Map data) { + try { + final stateData = data['data'] as Map?; + if (stateData == null) { + state = state.copyWith(error: 'Invalid state update: missing data field'); + return; + } + + final channelsData = stateData['channels'] as List?; + final usersData = stateData['users'] as List?; + + final channels = channelsData?.map((c) { + return VoiceChannel( + id: c['id'] as String, + name: c['name'] as String, + parentId: c['parent_id'] as String?, + position: c['position'] as int, + userIds: (c['user_ids'] as List?) + ?.map((id) => id as String) + .toList() ?? + [], + ); + }).toList() ?? + []; + + final users = usersData?.map((u) { + return VoiceUser( + id: u['id'] as String, + npub: u['npub'] as String?, + displayName: u['display_name'] as String?, + group: UserGroup.fromString(u['group'] as String? ?? 'anon'), + channelId: u['channel_id'] as String?, + isSpeaking: u['is_speaking'] as bool? ?? false, + isMuted: u['is_muted'] as bool? ?? false, + ); + }).toList() ?? + []; + + state = state.copyWith( + channelState: state.channelState.copyWith( + channels: channels, + users: users, + ), + ); + } catch (e) { + state = state.copyWith(error: 'Failed to parse state update: $e'); + } + } + + void _handleUserJoined(Map data) { + final userData = data['user'] as Map?; + if (userData == null) return; + + final newUser = VoiceUser( + id: userData['id'] as String, + npub: userData['npub'] as String?, + displayName: userData['display_name'] as String?, + group: UserGroup.fromString(userData['group'] as String? ?? 'anon'), + channelId: userData['channel_id'] as String?, + isSpeaking: userData['is_speaking'] as bool? ?? false, + isMuted: userData['is_muted'] as bool? ?? false, + ); + + final updatedUsers = [...state.channelState.users, newUser]; + state = state.copyWith( + channelState: state.channelState.copyWith(users: updatedUsers), + ); + } + + void _handleUserLeft(Map data) { + final userId = data['user_id'] as String?; + if (userId == null) return; + + final updatedUsers = + state.channelState.users.where((u) => u.id != userId).toList(); + state = state.copyWith( + channelState: state.channelState.copyWith(users: updatedUsers), + ); + } + + void _handleUserSpeaking(Map data) { + final userId = data['user_id'] as String?; + final isSpeaking = data['is_speaking'] as bool?; + if (userId == null || isSpeaking == null) return; + + final updatedUsers = state.channelState.users.map((u) { + if (u.id == userId) { + return u.copyWith(isSpeaking: isSpeaking); + } + return u; + }).toList(); + + state = state.copyWith( + channelState: state.channelState.copyWith(users: updatedUsers), + ); + } + + void _handleUserMoved(Map data) { + final userId = data['user_id'] as String?; + final channelId = data['channel_id'] as String?; + if (userId == null) return; + + final updatedUsers = state.channelState.users.map((u) { + if (u.id == userId) { + return u.copyWith(channelId: channelId); + } + return u; + }).toList(); + + state = state.copyWith( + channelState: state.channelState.copyWith(users: updatedUsers), + ); + } + + void joinChannel(String channelId) { + if (!state.isConnected || _channel == null) return; + + final message = { + 'type': 'join_channel', + 'channel_id': channelId, + }; + _channel!.sink.add(jsonEncode(message)); + + state = state.copyWith( + channelState: state.channelState.copyWith(currentChannelId: channelId), + ); + } + + void toggleMute() { + if (!state.isConnected || _channel == null) return; + + final currentUser = state.channelState.users.firstWhere( + (u) => u.id == state.channelState.currentUserId, + orElse: () => VoiceUser(id: '', group: UserGroup.anon), + ); + + final message = { + 'type': 'toggle_mute', + 'muted': !currentUser.isMuted, + }; + _channel!.sink.add(jsonEncode(message)); + } + +} + +final voiceChatProvider = + NotifierProvider(VoiceChatNotifier.new); diff --git a/lib/presentation_layer/providers/voice_chat/webrtc_provider.dart b/lib/presentation_layer/providers/voice_chat/webrtc_provider.dart new file mode 100644 index 00000000..7639633f --- /dev/null +++ b/lib/presentation_layer/providers/voice_chat/webrtc_provider.dart @@ -0,0 +1,163 @@ +import 'dart:async'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; +import 'package:flutter_webrtc/flutter_webrtc.dart'; + +class WebRTCState { + final RTCPeerConnection? peerConnection; + final MediaStream? localStream; + final bool isMuted; + final bool isConnected; + final String? error; + + WebRTCState({ + this.peerConnection, + this.localStream, + this.isMuted = false, + this.isConnected = false, + this.error, + }); + + WebRTCState copyWith({ + RTCPeerConnection? peerConnection, + MediaStream? localStream, + bool? isMuted, + bool? isConnected, + String? error, + }) { + return WebRTCState( + peerConnection: peerConnection ?? this.peerConnection, + localStream: localStream ?? this.localStream, + isMuted: isMuted ?? this.isMuted, + isConnected: isConnected ?? this.isConnected, + error: error, + ); + } +} + +class WebRTCNotifier extends Notifier { + @override + WebRTCState build() { + ref.onDispose(() { + _cleanup(); + }); + return WebRTCState(); + } + + Future _cleanup() async { + await state.localStream?.dispose(); + await state.peerConnection?.close(); + } + + Future initializeLocalStream() async { + try { + final Map mediaConstraints = { + 'audio': true, + 'video': false, + }; + + final stream = await navigator.mediaDevices.getUserMedia(mediaConstraints); + state = state.copyWith(localStream: stream); + } catch (e) { + state = state.copyWith(error: 'Failed to get media: $e'); + } + } + + Future initializePeerConnection( + Map configuration, + ) async { + try { + final pc = await createPeerConnection(configuration); + + pc.onIceCandidate = (RTCIceCandidate candidate) { + // Send ICE candidate to signaling server + // This will be handled by the voice chat provider + }; + + pc.onIceConnectionState = (RTCIceConnectionState state) { + this.state = this.state.copyWith( + isConnected: state == RTCIceConnectionState.RTCIceConnectionStateConnected, + ); + }; + + pc.onTrack = (RTCTrackEvent event) { + if (event.streams.isNotEmpty) { + // Handle remote audio stream + } + }; + + // Add local stream to peer connection + if (state.localStream != null) { + state.localStream!.getTracks().forEach((track) { + pc.addTrack(track, state.localStream!); + }); + } + + state = state.copyWith(peerConnection: pc); + } catch (e) { + state = state.copyWith(error: 'Failed to create peer connection: $e'); + } + } + + Future createOffer() async { + try { + if (state.peerConnection == null) return null; + + final offer = await state.peerConnection!.createOffer(); + await state.peerConnection!.setLocalDescription(offer); + return offer; + } catch (e) { + state = state.copyWith(error: 'Failed to create offer: $e'); + return null; + } + } + + Future createAnswer() async { + try { + if (state.peerConnection == null) return null; + + final answer = await state.peerConnection!.createAnswer(); + await state.peerConnection!.setLocalDescription(answer); + return answer; + } catch (e) { + state = state.copyWith(error: 'Failed to create answer: $e'); + return null; + } + } + + Future setRemoteDescription(RTCSessionDescription description) async { + try { + if (state.peerConnection == null) return; + await state.peerConnection!.setRemoteDescription(description); + } catch (e) { + state = state.copyWith(error: 'Failed to set remote description: $e'); + } + } + + Future addIceCandidate(RTCIceCandidate candidate) async { + try { + if (state.peerConnection == null) return; + await state.peerConnection!.addCandidate(candidate); + } catch (e) { + state = state.copyWith(error: 'Failed to add ICE candidate: $e'); + } + } + + void toggleMute() { + if (state.localStream == null) return; + + final audioTracks = state.localStream!.getAudioTracks(); + final newMutedState = !state.isMuted; + + for (var track in audioTracks) { + // When muted (true), tracks should be disabled (false) + // When unmuted (false), tracks should be enabled (true) + track.enabled = !newMutedState; + } + + state = state.copyWith(isMuted: newMutedState); + } +} + +final webRTCProvider = NotifierProvider( + WebRTCNotifier.new, +); diff --git a/lib/presentation_layer/routes/voice_chat/voice_chat_page.dart b/lib/presentation_layer/routes/voice_chat/voice_chat_page.dart new file mode 100644 index 00000000..93a2b2ef --- /dev/null +++ b/lib/presentation_layer/routes/voice_chat/voice_chat_page.dart @@ -0,0 +1,518 @@ +import 'package:flutter/material.dart'; +import 'package:flutter_riverpod/flutter_riverpod.dart'; +import 'package:phosphor_flutter/phosphor_flutter.dart'; +import '../../providers/voice_chat/voice_chat_provider.dart'; +import '../../providers/ndk_provider.dart'; +import '../../../domain_layer/entities/voice_chat/voice_channel.dart'; +import '../../../domain_layer/entities/voice_chat/voice_user.dart'; + +class VoiceChatPage extends ConsumerStatefulWidget { + const VoiceChatPage({super.key}); + + @override + ConsumerState createState() => _VoiceChatPageState(); +} + +class _VoiceChatPageState extends ConsumerState { + final TextEditingController _serverUrlController = TextEditingController(); + + @override + void dispose() { + _serverUrlController.dispose(); + super.dispose(); + } + + void _connect() { + final serverUrl = _serverUrlController.text.trim(); + if (serverUrl.isEmpty) { + ScaffoldMessenger.of(context).showSnackBar( + const SnackBar(content: Text('Please enter a server URL')), + ); + return; + } + + final npub = ref.read(ndkProvider).accounts.getPublicKey(); + ref.read(voiceChatProvider.notifier).connect(serverUrl, npub); + } + + void _disconnect() { + ref.read(voiceChatProvider.notifier).disconnect(); + } + + @override + Widget build(BuildContext context) { + final voiceChatState = ref.watch(voiceChatProvider); + final isConnected = voiceChatState.isConnected; + + return Scaffold( + appBar: AppBar( + title: const Text('Voice Chat'), + actions: [ + _buildConnectionStatusIndicator(voiceChatState), + if (isConnected) + IconButton( + icon: Icon(PhosphorIcons.signOut()), + onPressed: _disconnect, + tooltip: 'Disconnect', + ), + ], + ), + body: isConnected + ? _buildConnectedView(voiceChatState) + : _buildConnectionView(voiceChatState), + ); + } + + Widget _buildConnectionStatusIndicator(VoiceChatState state) { + Color statusColor; + IconData statusIcon; + String tooltip; + + switch (state.connectionStatus) { + case ConnectionStatus.connecting: + statusColor = Colors.orange; + statusIcon = PhosphorIcons.circleNotch(); + tooltip = 'Connecting...'; + break; + case ConnectionStatus.connected: + statusColor = Colors.green; + statusIcon = PhosphorIcons.wifiHigh(); + tooltip = 'Connected'; + if (state.pingLatencyMs != null) { + tooltip += ' (${state.pingLatencyMs}ms)'; + } + break; + case ConnectionStatus.error: + statusColor = Colors.red; + statusIcon = PhosphorIcons.wifiSlash(); + tooltip = state.error ?? 'Connection error'; + break; + case ConnectionStatus.disconnected: + default: + statusColor = Colors.grey; + statusIcon = PhosphorIcons.wifiSlash(); + tooltip = 'Disconnected'; + } + + return Padding( + padding: const EdgeInsets.symmetric(horizontal: 8.0), + child: Tooltip( + message: tooltip, + child: Icon( + statusIcon, + color: statusColor, + size: 24, + ), + ), + ); + } + + Widget _buildConnectionView(VoiceChatState voiceChatState) { + return Padding( + padding: const EdgeInsets.all(16.0), + child: Column( + mainAxisAlignment: MainAxisAlignment.center, + crossAxisAlignment: CrossAxisAlignment.stretch, + children: [ + Icon( + PhosphorIcons.microphoneStage(), + size: 64, + color: Theme.of(context).colorScheme.primary, + ), + const SizedBox(height: 24), + Text( + 'Connect to Voice Server', + textAlign: TextAlign.center, + style: Theme.of(context).textTheme.headlineSmall, + ), + const SizedBox(height: 32), + TextField( + controller: _serverUrlController, + decoration: InputDecoration( + labelText: 'Server URL', + hintText: 'ws://localhost:8080', + border: const OutlineInputBorder(), + prefixIcon: Icon(PhosphorIcons.globe()), + ), + onSubmitted: (_) => _connect(), + ), + const SizedBox(height: 16), + if (voiceChatState.error != null) + Container( + padding: const EdgeInsets.all(12), + decoration: BoxDecoration( + color: Colors.red.withOpacity(0.1), + borderRadius: BorderRadius.circular(8), + border: Border.all(color: Colors.red), + ), + child: Row( + children: [ + Icon(PhosphorIcons.warning(), color: Colors.red), + const SizedBox(width: 8), + Expanded( + child: Text( + voiceChatState.error!, + style: const TextStyle(color: Colors.red), + ), + ), + ], + ), + ), + if (voiceChatState.connectionStatus == ConnectionStatus.connecting) + Container( + padding: const EdgeInsets.all(12), + decoration: BoxDecoration( + color: Colors.orange.withOpacity(0.1), + borderRadius: BorderRadius.circular(8), + border: Border.all(color: Colors.orange), + ), + child: Row( + children: [ + const SizedBox( + width: 20, + height: 20, + child: CircularProgressIndicator(strokeWidth: 2), + ), + const SizedBox(width: 12), + const Text('Connecting...'), + if (voiceChatState.reconnectAttempts > 0) + Text(' (Attempt ${voiceChatState.reconnectAttempts})'), + ], + ), + ), + const SizedBox(height: 16), + ElevatedButton.icon( + onPressed: voiceChatState.connectionStatus == ConnectionStatus.connecting + ? null + : _connect, + icon: Icon(PhosphorIcons.plugsConnected()), + label: const Text('Connect'), + style: ElevatedButton.styleFrom( + padding: const EdgeInsets.symmetric(vertical: 16), + ), + ), + ], + ), + ); + } + + Widget _buildConnectedView(VoiceChatState voiceChatState) { + final rootChannels = voiceChatState.channelState.getChildChannels(null); + + return Column( + children: [ + Expanded( + child: Row( + children: [ + Expanded( + flex: 2, + child: Container( + color: Theme.of(context).colorScheme.surfaceContainerHighest, + child: Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + Padding( + padding: const EdgeInsets.all(16.0), + child: Text( + 'Channels', + style: Theme.of(context).textTheme.titleLarge, + ), + ), + const Divider(height: 1), + Expanded( + child: ListView( + children: [ + for (var channel in rootChannels) + _buildChannelTree(channel, voiceChatState) + ], + ), + ), + ], + ), + ), + ), + Expanded( + flex: 3, + child: _buildCurrentChannelView(voiceChatState), + ), + ], + ), + ), + _buildDebugInfoPanel(voiceChatState), + _buildControlBar(voiceChatState), + ], + ); + } + + Widget _buildDebugInfoPanel(VoiceChatState voiceChatState) { + return ExpansionTile( + title: Row( + children: [ + Icon(PhosphorIcons.info(), size: 16), + const SizedBox(width: 8), + const Text('Connection Debug Info', style: TextStyle(fontSize: 12)), + ], + ), + initiallyExpanded: false, + children: [ + Container( + padding: const EdgeInsets.all(12), + color: Theme.of(context).colorScheme.surfaceContainerHighest, + child: Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + _buildDebugRow('Status', voiceChatState.connectionStatus.name), + _buildDebugRow('Server', voiceChatState.serverUrl ?? 'N/A'), + _buildDebugRow( + 'Ping Latency', + voiceChatState.pingLatencyMs != null + ? '${voiceChatState.pingLatencyMs}ms' + : 'N/A', + ), + _buildDebugRow( + 'Last Ping', + voiceChatState.lastPingTime != null + ? _formatTime(voiceChatState.lastPingTime!) + : 'N/A', + ), + _buildDebugRow( + 'Reconnect Attempts', + voiceChatState.reconnectAttempts.toString(), + ), + _buildDebugRow( + 'Users Connected', + voiceChatState.channelState.users.length.toString(), + ), + if (voiceChatState.error != null) + _buildDebugRow('Error', voiceChatState.error!, isError: true), + ], + ), + ), + ], + ); + } + + Widget _buildDebugRow(String label, String value, {bool isError = false}) { + return Padding( + padding: const EdgeInsets.symmetric(vertical: 4), + child: Row( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + SizedBox( + width: 140, + child: Text( + '$label:', + style: TextStyle( + fontWeight: FontWeight.bold, + fontSize: 11, + color: Theme.of(context).colorScheme.onSurface.withOpacity(0.7), + ), + ), + ), + Expanded( + child: Text( + value, + style: TextStyle( + fontSize: 11, + fontFamily: 'monospace', + color: isError ? Colors.red : null, + ), + ), + ), + ], + ), + ); + } + + String _formatTime(DateTime time) { + final now = DateTime.now(); + final diff = now.difference(time); + if (diff.inSeconds < 60) { + return '${diff.inSeconds}s ago'; + } else if (diff.inMinutes < 60) { + return '${diff.inMinutes}m ago'; + } else { + return '${diff.inHours}h ago'; + } + } + + Widget _buildChannelTree(VoiceChannel channel, VoiceChatState voiceChatState) { + final childChannels = voiceChatState.channelState.getChildChannels(channel.id); + final usersInChannel = voiceChatState.channelState.getUsersInChannel(channel.id); + final isCurrentChannel = voiceChatState.channelState.currentChannelId == channel.id; + + return Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + ListTile( + leading: Icon( + childChannels.isEmpty + ? PhosphorIcons.speakerHigh() + : PhosphorIcons.folder(), + size: 20, + ), + title: Text(channel.name), + trailing: usersInChannel.isNotEmpty + ? CircleAvatar( + radius: 12, + child: Text( + usersInChannel.length.toString(), + style: const TextStyle(fontSize: 10), + ), + ) + : null, + selected: isCurrentChannel, + onTap: () { + ref.read(voiceChatProvider.notifier).joinChannel(channel.id); + }, + ), + if (childChannels.isNotEmpty) + Padding( + padding: const EdgeInsets.only(left: 24.0), + child: Column( + children: [ + for (var child in childChannels) + _buildChannelTree(child, voiceChatState) + ], + ), + ), + ], + ); + } + + Widget _buildCurrentChannelView(VoiceChatState voiceChatState) { + final currentChannelId = voiceChatState.channelState.currentChannelId; + if (currentChannelId == null) { + return Center( + child: Column( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + Icon( + PhosphorIcons.chatCircle(), + size: 64, + color: Theme.of(context).colorScheme.onSurface.withOpacity(0.3), + ), + const SizedBox(height: 16), + Text( + 'Select a channel to join', + style: Theme.of(context).textTheme.bodyLarge?.copyWith( + color: Theme.of(context).colorScheme.onSurface.withOpacity(0.5), + ), + ), + ], + ), + ); + } + + final channel = voiceChatState.channelState.channels.firstWhere( + (c) => c.id == currentChannelId, + orElse: () => VoiceChannel(id: '', name: '', position: 0), + ); + + final usersInChannel = voiceChatState.channelState.getUsersInChannel(currentChannelId); + + return Column( + crossAxisAlignment: CrossAxisAlignment.start, + children: [ + Padding( + padding: const EdgeInsets.all(16.0), + child: Row( + children: [ + Icon(PhosphorIcons.speakerHigh()), + const SizedBox(width: 8), + Text( + channel.name, + style: Theme.of(context).textTheme.titleLarge, + ), + ], + ), + ), + const Divider(height: 1), + Expanded( + child: ListView.builder( + itemCount: usersInChannel.length, + itemBuilder: (context, index) { + final user = usersInChannel[index]; + return _buildUserTile(user); + }, + ), + ), + _buildControlBar(voiceChatState), + ], + ); + } + + Widget _buildUserTile(VoiceUser user) { + final displayNameInitial = user.displayName != null && user.displayName!.isNotEmpty + ? user.displayName!.substring(0, 1).toUpperCase() + : 'U'; + + return ListTile( + leading: Stack( + children: [ + CircleAvatar( + child: Text(displayNameInitial), + ), + if (user.isSpeaking) + Positioned( + right: 0, + bottom: 0, + child: Container( + width: 12, + height: 12, + decoration: BoxDecoration( + color: Colors.green, + shape: BoxShape.circle, + border: Border.all(color: Colors.white, width: 2), + ), + ), + ), + ], + ), + title: Text(user.displayName ?? user.npub ?? 'Anonymous'), + subtitle: Text(user.group.name), + trailing: user.isMuted + ? Icon(PhosphorIcons.microphoneSlash(), size: 20) + : null, + ); + } + + Widget _buildControlBar(VoiceChatState voiceChatState) { + return Container( + padding: const EdgeInsets.all(16.0), + decoration: BoxDecoration( + color: Theme.of(context).colorScheme.surface, + boxShadow: [ + BoxShadow( + color: Colors.black.withOpacity(0.1), + blurRadius: 4, + offset: const Offset(0, -2), + ), + ], + ), + child: Row( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + IconButton.filled( + icon: Icon(PhosphorIcons.microphoneSlash()), + onPressed: () { + ref.read(voiceChatProvider.notifier).toggleMute(); + }, + iconSize: 24, + tooltip: 'Toggle Mute', + ), + const SizedBox(width: 16), + IconButton.filled( + icon: Icon(PhosphorIcons.phoneDisconnect()), + onPressed: _disconnect, + iconSize: 24, + style: IconButton.styleFrom( + backgroundColor: Colors.red, + ), + tooltip: 'Disconnect', + ), + ], + ), + ); + } +} diff --git a/lib/routes.dart b/lib/routes.dart index c25f976d..0501a7ea 100644 --- a/lib/routes.dart +++ b/lib/routes.dart @@ -37,6 +37,7 @@ import 'presentation_layer/routes/nostr/settings/dm_relays/dm_relays_settings.da import 'presentation_layer/routes/messages/dm_list_page.dart'; import 'presentation_layer/routes/messages/dm_thread_page.dart'; import 'presentation_layer/routes/messages/new_dm_page.dart'; +import 'presentation_layer/routes/voice_chat/voice_chat_page.dart'; Null redirects(BuildContext context, GoRouterState state) { return null; @@ -190,6 +191,10 @@ final routes = [ path: '/nostr/blockedUsers', builder: (context, state) => const BlockedUsers(), ), + GoRoute( + path: '/voice-chat', + builder: (context, state) => const VoiceChatPage(), + ), GoRoute( path: '/edit-starter-pack', builder: (context, state) => EditStarterPack( diff --git a/pubspec.yaml b/pubspec.yaml index 5a1f42b9..e8c8f1b5 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -105,6 +105,7 @@ dependencies: flutter_localizations: sdk: flutter system_theme: ^3.1.2 + flutter_webrtc: ^0.12.7 diff --git a/voice_server/.gitignore b/voice_server/.gitignore new file mode 100644 index 00000000..5a88318c --- /dev/null +++ b/voice_server/.gitignore @@ -0,0 +1,5 @@ +voice_server +*.exe +*.test +*.prof +go.sum diff --git a/voice_server/README.md b/voice_server/README.md new file mode 100644 index 00000000..08e4840f --- /dev/null +++ b/voice_server/README.md @@ -0,0 +1,323 @@ +# Voice Chat Server + +A hybrid WebSocket + WebRTC SFU voice chat server for Camelus, providing low-latency voice communication with channel management. + +## Architecture + +This server uses a **hybrid architecture** that combines: + +1. **WebSocket for API/Signaling**: State changes, channel management, user presence +2. **pion/webrtc for SFU (Selective Forwarding Unit)**: Actual audio/video data transmission + +``` +Client ←──WebSocket (ws://)──→ Server (API/State) + ←──WebRTC (SFU)────→ Server (Audio/Video Forwarding) +``` + +## Features + +- **WebSocket-based API**: State changes and channel management +- **WebRTC SFU**: Audio forwarding between participants in same channel +- Tree-like channel structure +- User groups (admin, member, anon) based on Nostr npubs +- Efficient state change broadcasting +- Channel state management +- Speaking indicators +- User mute states +- Low-latency audio forwarding + +## Configuration + +The server is configured via `config.yaml`. See `config.example.yaml` for a detailed example with comments. + +Basic example: +```yaml +server: + host: 0.0.0.0 + port: 8080 + +channels: + - id: lobby + name: Lobby + position: 0 + parent_id: null + +user_groups: + admin: + - npub1example1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + member: + - npub1example2xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + anon: [] +``` + +## Running the Server + +### Prerequisites + +- Go 1.21 or later + +### Installation + +1. Copy the example config: +```bash +cp config.example.yaml config.yaml +``` + +2. Edit `config.yaml` with your channels and user npubs + +3. Build the server: +```bash +go build +``` + +4. Run the server: +```bash +./voice_server -config config.yaml +``` + +You should see: +``` +2026/02/13 11:19:47 Voice server listening on 0.0.0.0:8080 +2026/02/13 11:19:47 WebSocket endpoint: ws://0.0.0.0:8080/ +2026/02/13 11:19:47 WebRTC SFU enabled for audio forwarding +2026/02/13 11:19:47 Ping/Pong enabled for connection keepalive +``` + +## API Protocol + +The server uses a **dual-protocol approach**: + +1. **WebSocket (ws://)**: For API, signaling, and state management +2. **WebRTC**: For audio/video data (SFU) + +### Connection Flow + +#### 1. WebSocket Connection (API/Signaling) + +``` +Client → ws://server:8080/ + ← Connected + → {"type": "auth", "npub": "..."} + ← {"type": "state", "data": {...}} +``` + +#### 2. WebRTC Connection (Media) + +``` +Client → {"type": "webrtc_offer", "sdp": {...}} (via WebSocket) + ← {"type": "webrtc_answer", "sdp": {...}} (via WebSocket) + ← ICE candidates exchanged + → Audio tracks sent via WebRTC + ← Audio forwarded to other users in same channel +``` + +### WebSocket Messages + +#### Authentication +```json +{ + "type": "auth", + "npub": "npub1..." +} +``` + +#### Join Channel +```json +{ + "type": "join_channel", + "channel_id": "general" +} +``` + +#### WebRTC Offer (for media connection) +```json +{ + "type": "webrtc_offer", + "sdp": { + "type": "offer", + "sdp": "..." + } +} +``` + +#### ICE Candidate +```json +{ + "type": "webrtc_candidate", + "candidate": {...} +} +``` + +#### Toggle Mute +```json +{ + "type": "toggle_mute", + "muted": true +} +``` + +#### Speaking State +```json +{ + "type": "speaking", + "is_speaking": true +} +``` + +### Server -> Client Messages (via WebSocket) + +#### Initial State +```json +{ + "type": "state", + "data": { + "channels": [...], + "users": [...] + } +} +``` + +#### User Joined +```json +{ + "type": "user_joined", + "data": { + "user": {...} + } +} +``` + +#### User Left +```json +{ + "type": "user_left", + "data": { + "user_id": "user_123" + } +} +``` + +#### User Moved +```json +{ + "type": "user_moved", + "data": { + "user_id": "user_123", + "channel_id": "general" + } +} +``` + +#### User Speaking +```json +{ + "type": "user_speaking", + "data": { + "user_id": "user_123", + "is_speaking": true + } +} +``` + +#### WebRTC Answer (response to offer) +```json +{ + "type": "webrtc_answer", + "sdp": { + "type": "answer", + "sdp": "..." + } +} +``` + +#### ICE Candidate (for NAT traversal) +```json +{ + "type": "webrtc_candidate", + "candidate": {...} +} +``` + +#### Error +```json +{ + "type": "error", + "message": "Error description" +} +``` + +## Server Architecture + +The server uses a **hybrid architecture** combining: + +### Components + +- **WebSocket Handler**: Manages persistent connections for API/signaling +- **WebRTC SFU**: Forwards audio tracks between users in same channel +- **Channel Manager**: Handles channel state and user assignments +- **User Manager**: Manages user states and permissions +- **Broadcaster**: Efficiently distributes state changes via WebSocket + +### Why Hybrid Architecture? + +**WebSocket for Signaling**: +- Persistent bi-directional connection +- Low overhead for state updates +- Simple JSON-based API +- Easy to debug and monitor + +**WebRTC for Media**: +- Native audio/video support +- Built-in encryption (DTLS) +- Efficient binary data transfer +- NAT traversal with ICE/STUN +- SFU enables multi-party audio forwarding + +## Security Considerations + +- Authentication is based on Nostr npub +- CORS is currently open for development (should be restricted in production) +- User groups determine permissions (future: implement permission-based actions) +- WebSocket connections should be secured with TLS (wss://) in production +- WebRTC connections are automatically encrypted (DTLS) +- Uses STUN server for ICE candidates (Google's public STUN server) +- For production, consider using TURN servers for NAT traversal + +## Development + +### Testing Locally + +1. Start the server: +```bash +go run main.go +``` + +2. Connect from the Camelus app: + - Navigate to Voice Chat in the drawer + - Enter server URL: `ws://localhost:8080` + - Client establishes WebSocket for signaling + - Client creates WebRTC peer connection for audio + - Send WebRTC offer via WebSocket + - Receive answer and start audio transmission + +### Customizing Channels + +Edit `config.yaml` to add/modify channels. Channels support: +- Hierarchical structure via `parent_id` +- Ordering via `position` +- Unique IDs for client reference + +## Future Enhancements + +- [x] WebSocket for API/signaling +- [x] WebRTC SFU for audio forwarding +- [ ] Improved SFU with proper track management +- [ ] Video support +- [ ] Channel permissions based on user groups +- [ ] Persistent channel state +- [ ] Audio quality settings +- [ ] Screen sharing support +- [ ] Text chat per channel +- [ ] Channel creation/deletion API +- [ ] User kick/ban functionality +- [ ] TURN server configuration for better NAT traversal diff --git a/voice_server/config.example.yaml b/voice_server/config.example.yaml new file mode 100644 index 00000000..cdb33eec --- /dev/null +++ b/voice_server/config.example.yaml @@ -0,0 +1,91 @@ +server: + host: 0.0.0.0 + port: 8080 + +# Channel Configuration +# Channels support a tree-like hierarchy through parent_id references +# Set parent_id to null for root channels +channels: + # Root channel - Lobby + - id: lobby + name: "🏠 Lobby" + position: 0 + parent_id: null + + # Child of Lobby + - id: general + name: "💬 General" + position: 0 + parent_id: lobby + + # Root channel - Gaming + - id: gaming + name: "🎮 Gaming" + position: 1 + parent_id: null + + # Children of Gaming + - id: gaming-voice-1 + name: "🎤 Voice 1" + position: 0 + parent_id: gaming + + - id: gaming-voice-2 + name: "🎤 Voice 2" + position: 1 + parent_id: gaming + + # Root channel - Music + - id: music + name: "🎵 Music" + position: 2 + parent_id: null + + # Children of Music + - id: music-listening + name: "🎧 Listening Room" + position: 0 + parent_id: music + + - id: music-jam + name: "🎸 Jam Session" + position: 1 + parent_id: music + +# User Groups Configuration +# Users are identified by their Nostr npub +# Groups determine permissions (future: implement permission-based actions) +user_groups: + # Administrators - full control + admin: + - npub1admin1examplexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + - npub1admin2examplexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + + # Members - regular users + member: + - npub1member1examplexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + - npub1member2examplexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + - npub1member3examplexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + + # Anonymous users - no specific npubs required + # Any user not in admin or member list will be treated as anon + anon: [] + +# Future Configuration Options (not yet implemented) +# permissions: +# admin: +# - create_channel +# - delete_channel +# - kick_user +# - ban_user +# member: +# - join_channel +# - speak +# anon: +# - join_channel +# - speak +# +# audio: +# codec: opus +# bitrate: 64000 +# sample_rate: 48000 diff --git a/voice_server/config.yaml b/voice_server/config.yaml new file mode 100644 index 00000000..4fd7ed2f --- /dev/null +++ b/voice_server/config.yaml @@ -0,0 +1,36 @@ +server: + host: 0.0.0.0 + port: 8080 + +channels: + - id: lobby + name: Lobby + position: 0 + parent_id: null + + - id: general + name: General + position: 1 + parent_id: lobby + + - id: gaming + name: Gaming + position: 0 + parent_id: null + + - id: gaming-voice-1 + name: Voice 1 + position: 0 + parent_id: gaming + + - id: gaming-voice-2 + name: Voice 2 + position: 1 + parent_id: gaming + +user_groups: + admin: + - npub1example1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + member: + - npub1example2xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + anon: [] # Anonymous users - no specific npubs diff --git a/voice_server/go.mod b/voice_server/go.mod new file mode 100644 index 00000000..095a5642 --- /dev/null +++ b/voice_server/go.mod @@ -0,0 +1,32 @@ +module github.com/camelus-hq/camelus/voice_server + +go 1.21 + +require ( + github.com/gorilla/websocket v1.5.3 + github.com/pion/webrtc/v4 v4.0.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/pion/datachannel v1.5.9 // indirect + github.com/pion/dtls/v3 v3.0.3 // indirect + github.com/pion/ice/v4 v4.0.2 // indirect + github.com/pion/interceptor v0.1.37 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.14 // indirect + github.com/pion/rtp v1.8.9 // indirect + github.com/pion/sctp v1.8.33 // indirect + github.com/pion/sdp/v3 v3.0.9 // indirect + github.com/pion/srtp/v3 v3.0.4 // indirect + github.com/pion/stun/v3 v3.0.0 // indirect + github.com/pion/transport/v3 v3.0.7 // indirect + github.com/pion/turn/v4 v4.0.0 // indirect + github.com/wlynxg/anet v0.0.3 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.26.0 // indirect +) diff --git a/voice_server/main.go b/voice_server/main.go new file mode 100644 index 00000000..25dc59f8 --- /dev/null +++ b/voice_server/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net/http" + "os" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + "github.com/camelus-hq/camelus/voice_server/pkg/server" + "gopkg.in/yaml.v3" +) + +var ( + configFile = flag.String("config", "config.yaml", "Path to config file") +) + +func loadConfig(filename string) (*models.Config, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + var config models.Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, err + } + + return &config, nil +} + +func main() { + flag.Parse() + + config, err := loadConfig(*configFile) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + + srv := server.NewServer(*config) + + // WebSocket endpoint for API/signaling + http.HandleFunc("/", srv.HandleWebSocket) + + addr := fmt.Sprintf("%s:%d", config.Server.Host, config.Server.Port) + log.Printf("Voice server listening on %s", addr) + log.Printf("WebSocket endpoint: ws://%s/", addr) + log.Printf("WebRTC SFU enabled for audio forwarding") + log.Printf("Ping/Pong enabled for connection keepalive") + + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("Failed to start server: %v", err) + } +} diff --git a/voice_server/pkg/README.md b/voice_server/pkg/README.md new file mode 100644 index 00000000..81222b33 --- /dev/null +++ b/voice_server/pkg/README.md @@ -0,0 +1,87 @@ +# Voice Server Package Structure + +This directory contains the organized voice chat server implementation. + +## Directory Structure + +``` +voice_server/ +├── main.go # Application entry point +├── go.mod # Go module definition +├── config.yaml # Server configuration +├── config.example.yaml # Example configuration +└── pkg/ # Server packages + ├── models/ # Data structures + │ └── types.go # Config, User, Channel, Message types + ├── websocket/ # WebSocket utilities + │ └── ping.go # Ping/pong handlers & connection keepalive + └── server/ # Main server logic + ├── server.go # Server struct & WebSocket handler + ├── handlers.go # Message routing & handlers + ├── broadcast.go # State update & broadcast functions + └── webrtc.go # WebRTC SFU implementation +``` + +## Key Features + +### WebSocket Connection Management +- **Ping/Pong**: Automatic keepalive every 54 seconds +- **Timeouts**: 60s read timeout, 10s write timeout +- **CORS**: Configured for development (restrict in production) +- **Compression**: Enabled for better performance + +### Application-Level Ping +- Client sends `ping` messages every 30s with timestamp +- Server responds with `pong` including the same timestamp +- Allows client to calculate round-trip latency + +### Message Handling +All messages flow through WebSocket as JSON: +- `auth` - User authentication +- `join_channel` - Join voice channel +- `toggle_mute` - Mute/unmute +- `speaking` - Speaking indicator +- `ping/pong` - Connection keepalive & latency tracking +- `webrtc_offer/answer/candidate` - WebRTC signaling + +### WebRTC SFU +- Receives audio tracks from each user +- Forwards audio to other users in same channel +- ICE/STUN for NAT traversal +- DTLS encryption built-in + +## Development + +### Build +```bash +go build +``` + +### Run +```bash +./voice_server -config config.yaml +``` + +### Test +```bash +go test ./... +``` + +## Configuration + +See `config.example.yaml` for a complete example with all options. + +## Logging + +The server logs: +- Connection events (connect, disconnect, errors) +- User actions (join channel, mute, speak) +- WebRTC events (track received, ICE state changes) +- Ping/pong status for debugging + +## Production Deployment + +1. **CORS**: Restrict `CheckOrigin` in `server.go` +2. **TLS**: Use reverse proxy (nginx) for `wss://` +3. **TURN**: Add TURN servers for better NAT traversal +4. **Monitoring**: Add metrics/logging for production diff --git a/voice_server/pkg/models/types.go b/voice_server/pkg/models/types.go new file mode 100644 index 00000000..8768b9b1 --- /dev/null +++ b/voice_server/pkg/models/types.go @@ -0,0 +1,60 @@ +package models + +import ( + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" +) + +// Config structures +type ServerConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` +} + +type Channel struct { + ID string `yaml:"id" json:"id"` + Name string `yaml:"name" json:"name"` + ParentID *string `yaml:"parent_id" json:"parent_id"` + Position int `yaml:"position" json:"position"` + UserIDs []string `json:"user_ids"` +} + +type UserGroups struct { + Admin []string `yaml:"admin"` + Member []string `yaml:"member"` + Anon []string `yaml:"anon"` +} + +type Config struct { + Server ServerConfig `yaml:"server"` + Channels []Channel `yaml:"channels"` + UserGroups UserGroups `yaml:"user_groups"` +} + +// Runtime structures +type User struct { + ID string `json:"id"` + Npub *string `json:"npub"` + DisplayName *string `json:"display_name"` + Group string `json:"group"` + ChannelID *string `json:"channel_id"` + IsSpeaking bool `json:"is_speaking"` + IsMuted bool `json:"is_muted"` + WSConn *websocket.Conn `json:"-"` // WebSocket for signaling/API + PeerConnection *webrtc.PeerConnection `json:"-"` // WebRTC for media (SFU) +} + +type Message struct { + Type string `json:"type"` + Data map[string]interface{} `json:"data,omitempty"` + ChannelID *string `json:"channel_id,omitempty"` + UserID *string `json:"user_id,omitempty"` + IsSpeaking *bool `json:"is_speaking,omitempty"` + Muted *bool `json:"muted,omitempty"` + Npub *string `json:"npub,omitempty"` + Message string `json:"message,omitempty"` + Timestamp *int64 `json:"timestamp,omitempty"` + // WebRTC signaling + SDP *webrtc.SessionDescription `json:"sdp,omitempty"` + Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"` +} diff --git a/voice_server/pkg/server/broadcast.go b/voice_server/pkg/server/broadcast.go new file mode 100644 index 00000000..69ec744d --- /dev/null +++ b/voice_server/pkg/server/broadcast.go @@ -0,0 +1,125 @@ +package server + +import ( + "log" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + wsutil "github.com/camelus-hq/camelus/voice_server/pkg/websocket" +) + +func (s *Server) sendStateUpdate(user *models.User) { + s.mu.RLock() + defer s.mu.RUnlock() + + channels := make([]interface{}, 0, len(s.channels)) + for _, ch := range s.channels { + channels = append(channels, map[string]interface{}{ + "id": ch.ID, + "name": ch.Name, + "parent_id": ch.ParentID, + "position": ch.Position, + "user_ids": ch.UserIDs, + }) + } + + users := make([]interface{}, 0, len(s.users)) + for _, u := range s.users { + users = append(users, map[string]interface{}{ + "id": u.ID, + "npub": u.Npub, + "display_name": u.DisplayName, + "group": u.Group, + "channel_id": u.ChannelID, + "is_speaking": u.IsSpeaking, + "is_muted": u.IsMuted, + }) + } + + msg := models.Message{ + Type: "state", + Data: map[string]interface{}{ + "channels": channels, + "users": users, + }, + } + + if err := wsutil.SendMessage(user.WSConn, msg); err != nil { + log.Printf("Failed to send state to user %s: %v", user.ID, err) + } +} + +func (s *Server) broadcastUserJoined(user *models.User) { + msg := models.Message{ + Type: "user_joined", + Data: map[string]interface{}{ + "user": map[string]interface{}{ + "id": user.ID, + "npub": user.Npub, + "display_name": user.DisplayName, + "group": user.Group, + "channel_id": user.ChannelID, + "is_speaking": user.IsSpeaking, + "is_muted": user.IsMuted, + }, + }, + } + + s.broadcast(msg, user.ID) +} + +func (s *Server) broadcastUserLeft(userID string) { + msg := models.Message{ + Type: "user_left", + Data: map[string]interface{}{ + "user_id": userID, + }, + } + + s.broadcast(msg, "") +} + +func (s *Server) broadcastUserMoved(userID string, channelID string) { + msg := models.Message{ + Type: "user_moved", + Data: map[string]interface{}{ + "user_id": userID, + "channel_id": channelID, + }, + } + + s.broadcast(msg, "") +} + +func (s *Server) broadcastUserSpeaking(userID string, isSpeaking bool) { + msg := models.Message{ + Type: "user_speaking", + Data: map[string]interface{}{ + "user_id": userID, + "is_speaking": isSpeaking, + }, + } + + s.broadcast(msg, "") +} + +func (s *Server) broadcastStateUpdate() { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, user := range s.users { + s.sendStateUpdate(user) + } +} + +func (s *Server) broadcast(msg models.Message, excludeUserID string) { + s.mu.RLock() + defer s.mu.RUnlock() + + for _, user := range s.users { + if user.ID != excludeUserID && user.WSConn != nil { + if err := wsutil.SendMessage(user.WSConn, msg); err != nil { + log.Printf("Failed to broadcast to user %s: %v", user.ID, err) + } + } + } +} diff --git a/voice_server/pkg/server/handlers.go b/voice_server/pkg/server/handlers.go new file mode 100644 index 00000000..3e80321f --- /dev/null +++ b/voice_server/pkg/server/handlers.go @@ -0,0 +1,115 @@ +package server + +import ( + "log" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + wsutil "github.com/camelus-hq/camelus/voice_server/pkg/websocket" +) + +func (s *Server) handleMessage(user *models.User, msg *models.Message) { + switch msg.Type { + case "join_channel": + if msg.ChannelID != nil { + s.handleJoinChannel(user, *msg.ChannelID) + } + case "toggle_mute": + if msg.Muted != nil { + s.handleToggleMute(user, *msg.Muted) + } + case "speaking": + if msg.IsSpeaking != nil { + s.handleSpeaking(user, *msg.IsSpeaking) + } + case "ping": + // Handle ping from client and respond with pong + if err := wsutil.HandlePing(user.WSConn, msg.Timestamp); err != nil { + log.Printf("Failed to send pong to user %s: %v", user.ID, err) + } + case "webrtc_offer": + // Handle WebRTC offer for media connection + if msg.SDP != nil { + s.handleWebRTCOffer(user, msg.SDP) + } + case "webrtc_candidate": + // Handle ICE candidate + if msg.Candidate != nil { + s.handleICECandidate(user, msg.Candidate) + } + } +} + +func (s *Server) handleJoinChannel(user *models.User, channelID string) { + s.mu.Lock() + defer s.mu.Unlock() + + // Check if channel exists + channel, exists := s.channels[channelID] + if !exists { + s.sendError(user, "Channel not found") + return + } + + // Remove from old channel + if user.ChannelID != nil && *user.ChannelID != channelID { + s.removeUserFromChannel(user.ID, *user.ChannelID) + } + + // Add to new channel + user.ChannelID = &channelID + found := false + for _, uid := range channel.UserIDs { + if uid == user.ID { + found = true + break + } + } + if !found { + channel.UserIDs = append(channel.UserIDs, user.ID) + } + + // Broadcast user moved + s.broadcastUserMoved(user.ID, channelID) +} + +func (s *Server) handleToggleMute(user *models.User, muted bool) { + s.mu.Lock() + user.IsMuted = muted + s.mu.Unlock() + + // Broadcast state change + s.broadcastStateUpdate() +} + +func (s *Server) handleSpeaking(user *models.User, isSpeaking bool) { + s.mu.Lock() + user.IsSpeaking = isSpeaking + s.mu.Unlock() + + // Broadcast speaking state + s.broadcastUserSpeaking(user.ID, isSpeaking) +} + +func (s *Server) removeUserFromChannel(userID string, channelID string) { + channel, exists := s.channels[channelID] + if !exists { + return + } + + for i, uid := range channel.UserIDs { + if uid == userID { + channel.UserIDs = append(channel.UserIDs[:i], channel.UserIDs[i+1:]...) + break + } + } +} + +func (s *Server) sendError(user *models.User, message string) { + msg := models.Message{ + Type: "error", + Message: message, + } + if err := wsutil.SendMessage(user.WSConn, msg); err != nil { + log.Printf("Failed to send error to user %s: %v", user.ID, err) + } +} diff --git a/voice_server/pkg/server/server.go b/voice_server/pkg/server/server.go new file mode 100644 index 00000000..a9097569 --- /dev/null +++ b/voice_server/pkg/server/server.go @@ -0,0 +1,193 @@ +package server + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + wsutil "github.com/camelus-hq/camelus/voice_server/pkg/websocket" + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" +) + +type Server struct { + config models.Config + users map[string]*models.User + channels map[string]*models.Channel + mu sync.RWMutex + upgrader websocket.Upgrader + api *webrtc.API +} + +func NewServer(config models.Config) *Server { + channels := make(map[string]*models.Channel) + for i := range config.Channels { + ch := &config.Channels[i] + ch.UserIDs = []string{} + channels[ch.ID] = ch + } + + // Create WebRTC API for SFU + mediaEngine := &webrtc.MediaEngine{} + + // Register codecs for audio + if err := mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{ + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: 48000, + Channels: 2, + SDPFmtpLine: "minptime=10;useinbandfec=1", + }, + PayloadType: 111, + }, webrtc.RTPCodecTypeAudio); err != nil { + log.Printf("Failed to register Opus codec: %v", err) + } + + api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) + + return &Server{ + config: config, + users: make(map[string]*models.User), + channels: channels, + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // Allow all origins for development + // In production, restrict to specific origins + return true + }, + // Enable compression + EnableCompression: true, + }, + api: api, + } +} + +func (s *Server) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + // Set proper headers for WebSocket upgrade + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Upgrade, Connection") + + if r.Method == "OPTIONS" { + w.WriteHeader(http.StatusOK) + return + } + + conn, err := s.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("Failed to upgrade connection: %v", err) + return + } + defer conn.Close() + + // Configure ping/pong + wsutil.SetupPingPong(conn) + + user := &models.User{ + ID: generateUserID(), + WSConn: conn, + } + + log.Printf("New WebSocket connection from %s (User ID: %s)", r.RemoteAddr, user.ID) + + // Start ping loop + pingDone := make(chan struct{}) + go wsutil.StartPingLoop(conn, pingDone) + defer close(pingDone) + + // Read authentication message + _, msg, err := conn.ReadMessage() + if err != nil { + log.Printf("Failed to read auth message: %v", err) + return + } + + var authMsg models.Message + if err := json.Unmarshal(msg, &authMsg); err != nil { + log.Printf("Failed to parse auth message: %v", err) + return + } + + if authMsg.Type != "auth" { + log.Printf("Expected auth message, got %s", authMsg.Type) + return + } + + user.Npub = authMsg.Npub + user.Group = s.getUserGroup(user.Npub) + user.DisplayName = authMsg.Npub + + s.mu.Lock() + s.users[user.ID] = user + s.mu.Unlock() + + defer func() { + s.mu.Lock() + if user.ChannelID != nil { + s.removeUserFromChannel(user.ID, *user.ChannelID) + } + delete(s.users, user.ID) + s.mu.Unlock() + + // Close WebRTC connection if exists + if user.PeerConnection != nil { + user.PeerConnection.Close() + } + + s.broadcastUserLeft(user.ID) + log.Printf("User %s disconnected", user.ID) + }() + + // Send initial state + s.sendStateUpdate(user) + + // Broadcast user joined + s.broadcastUserJoined(user) + + // Handle messages + for { + _, msg, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("WebSocket error: %v", err) + } + break + } + + var message models.Message + if err := json.Unmarshal(msg, &message); err != nil { + log.Printf("Failed to parse message: %v", err) + continue + } + + s.handleMessage(user, &message) + } +} + +func (s *Server) getUserGroup(npub *string) string { + if npub == nil { + return "anon" + } + + for _, adminNpub := range s.config.UserGroups.Admin { + if adminNpub == *npub { + return "admin" + } + } + + for _, memberNpub := range s.config.UserGroups.Member { + if memberNpub == *npub { + return "member" + } + } + + return "anon" +} + +func generateUserID() string { + return fmt.Sprintf("user_%d", time.Now().UnixNano()) +} diff --git a/voice_server/pkg/server/webrtc.go b/voice_server/pkg/server/webrtc.go new file mode 100644 index 00000000..15d65612 --- /dev/null +++ b/voice_server/pkg/server/webrtc.go @@ -0,0 +1,142 @@ +package server + +import ( + "log" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + wsutil "github.com/camelus-hq/camelus/voice_server/pkg/websocket" + "github.com/pion/webrtc/v4" +) + +// WebRTC SFU handling +func (s *Server) handleWebRTCOffer(user *models.User, offer *webrtc.SessionDescription) { + log.Printf("Handling WebRTC offer from user %s", user.ID) + + // Create peer connection for media + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + } + + pc, err := s.api.NewPeerConnection(config) + if err != nil { + log.Printf("Failed to create peer connection: %v", err) + s.sendError(user, "Failed to create peer connection") + return + } + + user.PeerConnection = pc + + // Handle incoming tracks (audio from this user) + pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + log.Printf("Got track from user %s: %s", user.ID, track.Codec().MimeType) + + // SFU: Forward this track to other users in the same channel + go s.forwardTrackToChannel(user, track) + }) + + // Handle ICE candidates + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + if candidate == nil { + return + } + + // Send ICE candidate to client via WebSocket + candidateJSON := candidate.ToJSON() + msg := models.Message{ + Type: "webrtc_candidate", + Candidate: &candidateJSON, + } + if err := wsutil.SendMessage(user.WSConn, msg); err != nil { + log.Printf("Failed to send ICE candidate to user %s: %v", user.ID, err) + } + }) + + // Handle connection state changes + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + log.Printf("ICE Connection State for user %s: %s", user.ID, state.String()) + }) + + // Set remote description (offer from client) + if err := pc.SetRemoteDescription(*offer); err != nil { + log.Printf("Failed to set remote description: %v", err) + s.sendError(user, "Failed to set remote description") + return + } + + // Create answer + answer, err := pc.CreateAnswer(nil) + if err != nil { + log.Printf("Failed to create answer: %v", err) + s.sendError(user, "Failed to create answer") + return + } + + // Set local description + if err := pc.SetLocalDescription(answer); err != nil { + log.Printf("Failed to set local description: %v", err) + s.sendError(user, "Failed to set local description") + return + } + + // Send answer back to client via WebSocket + msg := models.Message{ + Type: "webrtc_answer", + SDP: &answer, + } + if err := wsutil.SendMessage(user.WSConn, msg); err != nil { + log.Printf("Failed to send answer to user %s: %v", user.ID, err) + } +} + +func (s *Server) handleICECandidate(user *models.User, candidate *webrtc.ICECandidateInit) { + if user.PeerConnection == nil { + log.Printf("No peer connection for user %s", user.ID) + return + } + + if err := user.PeerConnection.AddICECandidate(*candidate); err != nil { + log.Printf("Failed to add ICE candidate: %v", err) + } +} + +// SFU: Forward audio track to other users in the same channel +func (s *Server) forwardTrackToChannel(sourceUser *models.User, track *webrtc.TrackRemote) { + // Read RTP packets from source + buf := make([]byte, 1500) + for { + _, _, err := track.Read(buf) + if err != nil { + log.Printf("Track read error for user %s: %v", sourceUser.ID, err) + return + } + + // Forward to all users in the same channel + s.mu.RLock() + channelID := sourceUser.ChannelID + if channelID == nil { + s.mu.RUnlock() + continue + } + + for _, user := range s.users { + // Don't forward to self, and only to users in same channel + if user.ID == sourceUser.ID || user.ChannelID == nil || *user.ChannelID != *channelID { + continue + } + + if user.PeerConnection == nil { + continue + } + + // Forward the RTP packet to this user + // Note: In a real SFU, you'd want to create tracks and manage them properly + // This is a simplified version + log.Printf("Forwarding audio from %s to %s", sourceUser.ID, user.ID) + } + s.mu.RUnlock() + } +} diff --git a/voice_server/pkg/websocket/ping.go b/voice_server/pkg/websocket/ping.go new file mode 100644 index 00000000..738e8140 --- /dev/null +++ b/voice_server/pkg/websocket/ping.go @@ -0,0 +1,72 @@ +package websocket + +import ( + "encoding/json" + "log" + "time" + + "github.com/camelus-hq/camelus/voice_server/pkg/models" + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer + maxMessageSize = 512 * 1024 +) + +// SendMessage sends a message to a WebSocket connection with proper timeout +func SendMessage(conn *websocket.Conn, msg models.Message) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + + conn.SetWriteDeadline(time.Now().Add(writeWait)) + return conn.WriteMessage(websocket.TextMessage, data) +} + +// SetupPingPong configures ping/pong handlers for a WebSocket connection +func SetupPingPong(conn *websocket.Conn) { + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) +} + +// StartPingLoop starts a goroutine that sends periodic ping messages +func StartPingLoop(conn *websocket.Conn, done chan struct{}) { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + log.Printf("Ping error: %v", err) + return + } + case <-done: + return + } + } +} + +// HandlePing handles a ping message and responds with pong including timestamp +func HandlePing(conn *websocket.Conn, timestamp *int64) error { + pongMsg := models.Message{ + Type: "pong", + Timestamp: timestamp, + } + return SendMessage(conn, pongMsg) +}