From 8037f7b974f3d9138284e32be888a481884ea9e9 Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Wed, 10 Apr 2024 10:06:46 -0400 Subject: [PATCH] RSDK-7045 Implement StreamTicks (#189) * service * stream ticks * override * comment * PR feedback * add extra --- lib/src/components/board/board.dart | 19 +++++- lib/src/components/board/client.dart | 20 +++++++ lib/src/components/board/service.dart | 23 +++++++- test/unit_test/components/board_test.dart | 71 +++++++++++++++++++++++ 4 files changed, 129 insertions(+), 4 deletions(-) diff --git a/lib/src/components/board/board.dart b/lib/src/components/board/board.dart index 1776960631c..d8a512f7626 100644 --- a/lib/src/components/board/board.dart +++ b/lib/src/components/board/board.dart @@ -1,7 +1,9 @@ +import 'dart:collection'; + import 'package:fixnum/fixnum.dart'; import '../../gen/common/v1/common.pb.dart' as common; -import '../../gen/component/board/v1/board.pbenum.dart'; +import '../../gen/component/board/v1/board.pbgrpc.dart'; import '../../resource/base.dart'; import '../../robot/client.dart'; @@ -26,6 +28,15 @@ class BoardStatus { } } +/// Tick of a digital interrupt +class Tick { + String pinName; + bool high; + Int64 time; + + Tick({required this.pinName, required this.high, required this.time}); +} + /// Board represents a physical general purpose compute board that contains various /// components such as analog readers, and digital interrupts. abstract class Board extends Resource { @@ -58,6 +69,12 @@ abstract class Board extends Resource { /// Return the current value of the interrupt which is based on the type of Interrupt. Future digitalInterruptValue(String digitalInterruptName, {Map? extra}); + // Stream digital interrupts ticks. + Stream streamTicks(List interrupts, {Map? extra}); + + // addCallbacks adds a listener for the digital interrupts. + Future addCallbacks(List interrupts, Queue tickQueue, {Map? extra}); + /// Set the board to the indicated power mode. Future setPowerMode(PowerMode powerMode, int seconds, int nanos, {Map? extra}); diff --git a/lib/src/components/board/client.dart b/lib/src/components/board/client.dart index 86b2e68352c..fe84abc7df1 100644 --- a/lib/src/components/board/client.dart +++ b/lib/src/components/board/client.dart @@ -1,3 +1,6 @@ +import 'dart:async'; +import 'dart:collection'; + import 'package:fixnum/fixnum.dart'; import 'package:grpc/grpc_connection_interface.dart'; @@ -120,6 +123,23 @@ class BoardClient extends Board implements ResourceRPCClient { return response.value.toInt(); } + @override + Future addCallbacks(List interrupts, Queue tickQueue, {Map? extra}) async { + // addCallbacks not implemented on client side since it is a helper for StreamTicks. + throw UnimplementedError(); + } + + @override + Stream streamTicks(List interrupts, {Map? extra}) { + final response = client.streamTicks(StreamTicksRequest() + ..name = name + ..pinNames.addAll(interrupts) + ..extra = extra?.toStruct() ?? Struct()); + + final stream = response.map((resp) => Tick(pinName: resp.pinName, high: resp.high, time: resp.time)); + return stream.asBroadcastStream(onCancel: (_) => response.cancel()); + } + @override Future setPowerMode(PowerMode powerMode, int seconds, int nanos, {Map? extra}) async { final duration = grpc_duration.Duration() diff --git a/lib/src/components/board/service.dart b/lib/src/components/board/service.dart index 9bed223d70c..ed76b1632bb 100644 --- a/lib/src/components/board/service.dart +++ b/lib/src/components/board/service.dart @@ -1,3 +1,5 @@ +import 'dart:collection'; + import 'package:fixnum/fixnum.dart'; import 'package:grpc/grpc.dart'; @@ -112,8 +114,23 @@ class BoardService extends BoardServiceBase { } @override - Stream streamTicks(ServiceCall call, StreamTicksRequest request) { - // TODO: implement streamTicks - throw UnimplementedError(); + Stream streamTicks(ServiceCall call, StreamTicksRequest request) async* { + final board = _fromManager(request.name); + + final ticks = Queue(); + await board.addCallbacks(request.pinNames, ticks); + + try { + while (true) { + await Future.delayed(const Duration(microseconds: 1)); + if (ticks.isNotEmpty) { + final tick = ticks.first; + ticks.removeFirst(); + yield StreamTicksResponse(pinName: tick.pinName, high: tick.high, time: tick.time); + } + } + } catch (error) { + rethrow; + } } } diff --git a/test/unit_test/components/board_test.dart b/test/unit_test/components/board_test.dart index b32a472da95..1b4c821d59e 100644 --- a/test/unit_test/components/board_test.dart +++ b/test/unit_test/components/board_test.dart @@ -1,3 +1,5 @@ +import 'dart:collection'; + import 'package:fixnum/fixnum.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:grpc/grpc.dart'; @@ -16,6 +18,7 @@ class FakeBoard extends Board { final Map analogMap = {'pin': 0}; final BoardStatus boardStatus = const BoardStatus({'1': 0}, {'1': 0}); PowerMode powerMode = PowerMode.POWER_MODE_NORMAL; + final Map> tickCallbackMap = {}; Map? extra; @override @@ -94,6 +97,24 @@ class FakeBoard extends Board { this.extra = extra; analogMap[pin] = value; } + + @override + // Stream digital interrupts ticks. + Stream streamTicks(List interrupts, {Map? extra}) { + throw UnimplementedError(); + } + + @override + Future addCallbacks(List interrupts, Queue tickQueue, {Map? extra}) async { + for (final i in interrupts) { + tickCallbackMap[i] = tickQueue; + } + } + + Future tick(Tick tick) async { + final queue = tickCallbackMap[tick.pinName]; + queue?.add(tick); + } } void main() { @@ -115,6 +136,13 @@ void main() { expect(await board.digitalInterruptValue('1'), expected); }); + test('addCallbacks', () async { + final tickQueue = Queue(); + final interrupts = ['1']; + await board.addCallbacks(interrupts, tickQueue); + expect(board.tickCallbackMap['1'], tickQueue); + }); + test('gpio', () async { const expected = false; expect(await board.gpio('pin'), expected); @@ -223,6 +251,29 @@ void main() { expect(response.value.toInt(), expected); }); + test('streamTicks', () async { + final client = BoardServiceClient(channel); + + final request = StreamTicksRequest() + ..name = name + ..pinNames.add('1'); + + final stream = client.streamTicks(request); + + // Give time for server to start streaming. + await Future.delayed(const Duration(milliseconds: 100)); + + final tick1 = Tick(pinName: '1', high: true, time: Int64(1000)); + await board.tick(tick1); + await for (var resp in stream) { + expect(resp.pinName, '1'); + expect(resp.high, true); + expect(resp.time, Int64(1000)); + break; + } + await stream.cancel(); + }); + test('gpio', () async { final client = BoardServiceClient(channel); const expected = false; @@ -345,6 +396,26 @@ void main() { const expected = 0; expect(await client.digitalInterruptValue('1'), Int64(expected)); }); + test('streamTicks', () async { + final client = BoardClient(name, channel); + + final stream = client.streamTicks(['1']); + + // Give time for server to start streaming. + await Future.delayed(const Duration(milliseconds: 100)); + + final testTick = Tick(pinName: '1', high: true, time: Int64(1000)); + await board.tick(testTick); + + final sub = stream.listen(null); + + sub.onData((tick) async { + expect(tick.pinName, testTick.pinName); + expect(tick.high, testTick.high); + expect(tick.time, testTick.time); + await sub.cancel(); + }); + }); test('gpio', () async { final client = BoardClient(name, channel);