Skip to content

Commit 9cbff9c

Browse files
committed
Use reflection
1 parent b6db553 commit 9cbff9c

File tree

6 files changed

+92
-29
lines changed

6 files changed

+92
-29
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ build-buf: $(node_modules) clean-buf
5757
$(buf) generate buf.build/googleapis/googleapis
5858
$(buf) generate buf.build/viamrobotics/api:$$(cat api_version.lock) --path common,component,robot,service,app,provisioning,tagger,stream
5959
$(buf) generate buf.build/viamrobotics/goutils
60+
$(buf) generate buf.build/grpc/grpc --path grpc/reflection/v1/reflection.proto
6061

6162
# js targets
6263

examples/vanilla/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/robot/client.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,15 @@ export class RobotClient extends EventDispatcher implements Robot {
170170
this.onDisconnect();
171171
}
172172
);
173-
this.sessionManager = new SessionManager((): Transport => {
174-
if (!this.transport) {
175-
throw new Error(RobotClient.notConnectedYetStr);
173+
this.sessionManager = new SessionManager(
174+
this.serviceHost,
175+
(): Transport => {
176+
if (!this.transport) {
177+
throw new Error(RobotClient.notConnectedYetStr);
178+
}
179+
return this.transport;
176180
}
177-
return this.transport;
178-
});
181+
);
179182

180183
// For each connection event type, add a listener to capture that
181184
// event and re-emit it with the 'connectionstatechange' event

src/robot/session-manager.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const mockGetHeartBeatWindow = new Duration({
2727

2828
describe('SessionManager', () => {
2929
beforeEach(() => {
30-
sm = new SessionManager(() => mockTransport);
30+
sm = new SessionManager('', () => mockTransport);
3131
});
3232

3333
it('no session initially', () => {

src/robot/session-manager.ts

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,23 @@
1+
/* eslint-disable max-depth */
2+
import {
3+
BinaryReader,
4+
FileDescriptorProto,
5+
MethodOptions,
6+
} from '@bufbuild/protobuf';
17
import {
28
Code,
39
ConnectError,
410
createClient,
511
type Transport,
612
} from '@connectrpc/connect';
13+
import { createAsyncIterable } from '@connectrpc/connect/protocol';
14+
import { safety_heartbeat_monitored as safteyHeartbeatMonitored } from '../gen/common/v1/common_pb';
15+
import { ServerReflection } from '../gen/grpc/reflection/v1/reflection_connect';
16+
import {
17+
FileDescriptorResponse,
18+
ListServiceResponse,
19+
ServerReflectionRequest,
20+
} from '../gen/grpc/reflection/v1/reflection_pb';
721
import { RobotService } from '../gen/robot/v1/robot_connect';
822
import {
923
SendSessionHeartbeatRequest,
@@ -22,29 +36,14 @@ const timeoutBlob = new Blob(
2236
);
2337

2438
export default class SessionManager {
25-
public readonly transport: Transport;
39+
public static heartbeatMonitoredMethods: Record<string, boolean> = {};
2640

27-
public static readonly heartbeatMonitoredMethods = new Set<string>([
28-
'/viam.component.arm.v1.ArmService/MoveToPosition',
29-
'/viam.component.arm.v1.ArmService/MoveToJointPositions',
30-
'/viam.component.arm.v1.ArmService/MoveThroughJointPositions',
31-
'/viam.component.base.v1.BaseService/MoveStraight',
32-
'/viam.component.base.v1.BaseService/Spin',
33-
'/viam.component.base.v1.BaseService/SetPower',
34-
'/viam.component.base.v1.BaseService/SetVelocity',
35-
'/viam.component.gantry.v1.GantryService/MoveToPosition',
36-
'/viam.component.gripper.v1.GripperService/Open',
37-
'/viam.component.gripper.v1.GripperService/Grab',
38-
'/viam.component.motor.v1.MotorService/SetPower',
39-
'/viam.component.motor.v1.MotorService/GoFor',
40-
'/viam.component.motor.v1.MotorService/GoTo',
41-
'/viam.component.motor.v1.MotorService/SetRPM',
42-
'/viam.component.servo.v1.ServoService/Move',
43-
]);
41+
public readonly transport: Transport;
4442

4543
private currentSessionID = '';
4644
private sessionsSupported: boolean | undefined;
4745
private heartbeatIntervalMs: number | undefined;
46+
private host = '';
4847

4948
private starting: Promise<void> | undefined;
5049

@@ -53,7 +52,11 @@ export default class SessionManager {
5352
return createClient(RobotService, transport);
5453
}
5554

56-
constructor(private deferredTransport: () => Transport) {
55+
constructor(
56+
host: string,
57+
private deferredTransport: () => Transport
58+
) {
59+
this.host = host;
5760
this.transport = new SessionTransport(this.deferredTransport, this);
5861
}
5962

@@ -185,6 +188,7 @@ export default class SessionManager {
185188
(Number(heartbeatWindow.seconds) * 1e3 +
186189
heartbeatWindow.nanos / 1e6) /
187190
5;
191+
await this.applyHeartbeatMonitoredMethods();
188192
resolve();
189193
this.heartbeat().catch(console.error); // eslint-disable-line no-console
190194
})()
@@ -198,4 +202,59 @@ export default class SessionManager {
198202

199203
return this.getSessionMetadataInner();
200204
}
205+
206+
private async applyHeartbeatMonitoredMethods(): Promise<void> {
207+
const client = createClient(ServerReflection, this.transport);
208+
const request = new ServerReflectionRequest({
209+
host: this.host,
210+
messageRequest: { case: 'listServices', value: '' },
211+
});
212+
const responseStream = client.serverReflectionInfo(
213+
createAsyncIterable([request]),
214+
{ timeoutMs: 10_000 }
215+
);
216+
for await (const serviceResponse of responseStream) {
217+
const fdpRequests = (
218+
serviceResponse.messageResponse.value as ListServiceResponse
219+
).service.map((service) => {
220+
return new ServerReflectionRequest({
221+
messageRequest: { case: 'fileContainingSymbol', value: service.name },
222+
});
223+
});
224+
const fdpResponseStream = client.serverReflectionInfo(
225+
createAsyncIterable(fdpRequests),
226+
{ timeoutMs: 10_000 }
227+
);
228+
for await (const fdpResponse of fdpResponseStream) {
229+
for (const fdp of (
230+
fdpResponse.messageResponse.value as FileDescriptorResponse
231+
).fileDescriptorProto) {
232+
const protoFile = FileDescriptorProto.fromBinary(fdp);
233+
for (const service of protoFile.service) {
234+
for (const method of service.method) {
235+
SessionManager.heartbeatMonitoredMethods[
236+
`/${protoFile.package}.${service.name}/${method.name}`
237+
] = SessionManager.hasHeartbeatOption(method.options);
238+
}
239+
}
240+
}
241+
}
242+
}
243+
}
244+
245+
private static hasHeartbeatOption(options?: MethodOptions): boolean {
246+
if (!options) {
247+
return false;
248+
}
249+
const reader = new BinaryReader(options.toBinary());
250+
while (reader.pos < reader.len) {
251+
const tag = reader.tag();
252+
const [fieldNumber] = tag;
253+
if (fieldNumber === safteyHeartbeatMonitored.field.no) {
254+
return true;
255+
}
256+
reader.string();
257+
}
258+
return false;
259+
}
201260
}

src/robot/session-transport.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export default class SessionTransport implements Transport {
2020
constructor(
2121
protected readonly deferredTransport: () => Transport,
2222
protected readonly sessionManager: SessionManager
23-
) {}
23+
) { }
2424

2525
private async getSessionMetadata(): Promise<Headers> {
2626
try {
@@ -51,7 +51,7 @@ export default class SessionTransport implements Transport {
5151
): Promise<UnaryResponse<I, O>> {
5252
const newHeaders = cloneHeaders(header);
5353
const methodPath = `/${service.typeName}/${method.name}`;
54-
if (SessionManager.heartbeatMonitoredMethods.has(methodPath)) {
54+
if (SessionManager.heartbeatMonitoredMethods[methodPath] ?? false) {
5555
const md = await this.getSessionMetadata();
5656
for (const [key, value] of md) {
5757
newHeaders.set(key, value);
@@ -82,7 +82,7 @@ export default class SessionTransport implements Transport {
8282
): Promise<StreamResponse<I, O>> {
8383
const newHeaders = cloneHeaders(header);
8484
const methodPath = `/${service.typeName}/${method.name}`;
85-
if (SessionManager.heartbeatMonitoredMethods.has(methodPath)) {
85+
if (SessionManager.heartbeatMonitoredMethods[methodPath] ?? false) {
8686
const md = await this.getSessionMetadata();
8787
for (const [key, value] of md) {
8888
newHeaders.set(key, value);

0 commit comments

Comments
 (0)