Skip to content

Commit 6f9b7c7

Browse files
feat: introduce terminateProbe command (#37)
1 parent 26dc084 commit 6f9b7c7

File tree

8 files changed

+64
-34
lines changed

8 files changed

+64
-34
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,14 @@ $ camera-probe
5656

5757
## Programmatic Usage
5858
```js
59-
import { onvifDevices$ } from 'camera-probe'
59+
import { onvifDevices$, terminateProbe } from 'camera-probe'
6060

6161
onvifDevices$.subscribe(console.log)
6262

63+
// be sure to close the socket connection when complete with your query
64+
// This is a tad awkward until a better solution to stopping the inner observables is achieved.
65+
terminateProbe()
66+
6367
// results
6468
[ { name: 'Amcrest',
6569
hardware: 'IP2M-8200',

src/core/probe.ts

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { createSocket, RemoteInfo } from 'dgram'
22
import { Strings, Numbers, IProbeConfig, DEFAULT_PROBE_CONFIG } from './interfaces'
33
import { Observable, Observer, fromEvent, timer } from 'rxjs'
4-
import { first, shareReplay, map, distinctUntilChanged, mapTo, takeWhile, takeUntil, scan } from 'rxjs/operators'
4+
import { shareReplay, map, distinctUntilChanged, mapTo, takeUntil, scan } from 'rxjs/operators'
55

66
type IMessage = readonly [Buffer, RemoteInfo]
77
type TimestampMessages = readonly TimestampedMessage[]
@@ -18,7 +18,7 @@ const distinctUntilObjectChanged = <T>(source: Observable<T>) => source.pipe(dis
1818
const keys1 = Object.keys(a)
1919
const keys2 = Object.keys(b)
2020

21-
return keys1.length === keys2.length &&
21+
return keys1.length === keys2.length &&
2222
keys1.reduce((acc: boolean, curr) => acc === false ? false : keys2.includes(curr) as boolean, true)
2323
}))
2424

@@ -41,25 +41,34 @@ export const flattenBuffersWithInfo =
4141

4242
export const probe =
4343
(config?: Partial<IProbeConfig>) =>
44-
(messages: Strings): Observable<Strings> =>
45-
Observable.create((obs: Observer<Strings>) => {
46-
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
47-
const socket = createSocket({ type: 'udp4' })
48-
const socketClosed$ = fromEvent<void>(socket, 'close').pipe(first(), shareReplay(1))
49-
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))
44+
(messages: Strings) =>
45+
(until: Observable<any>): Observable<Strings> =>
46+
Observable.create((obs: Observer<Strings>) => {
47+
const cfg = { ...DEFAULT_PROBE_CONFIG, ...(config || {}) }
48+
const socket = createSocket({ type: 'udp4' })
49+
const socketMessages$ = fromEvent<IMessage>(socket, 'message').pipe(map(a => a[0]), shareReplay(1))
50+
51+
socket.on('err', err => obs.error(err))
52+
socket.on('close', () => obs.complete())
53+
54+
timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
55+
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
56+
takeUntil(until))
57+
.subscribe(bfrPorts => {
58+
bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address))
59+
})
5060

51-
timer(0, cfg.PROBE_REQUEST_SAMPLE_RATE_MS).pipe(
52-
mapTo(flattenBuffersWithInfo(cfg.PORTS)(cfg.MULTICAST_ADDRESS)(messages.map(mapStringToBuffer))),
53-
takeWhile(() => !obs.closed),
54-
takeUntil(socketClosed$))
55-
.subscribe(bfrPorts => bfrPorts.forEach(mdl => socket.send(mdl.buffer, 0, mdl.buffer.length, mdl.port, mdl.address)))
56-
57-
socketMessages$.pipe(
58-
timestamp,
59-
accumulateFreshMessages(cfg.PROBE_RESPONSE_FALLOUT_MS),
60-
mapStrToDictionary(cfg.RESULT_DEDUPE_FN),
61-
distinctUntilObjectChanged,
62-
toArrayOfValues,
63-
flattenDocumentStrings
64-
).subscribe(msg => obs.next(msg))
65-
})
61+
socketMessages$.pipe(
62+
timestamp,
63+
accumulateFreshMessages(cfg.PROBE_RESPONSE_FALLOUT_MS),
64+
mapStrToDictionary(cfg.RESULT_DEDUPE_FN),
65+
distinctUntilObjectChanged,
66+
toArrayOfValues,
67+
flattenDocumentStrings,
68+
takeUntil(until)
69+
).subscribe(msg => obs.next(msg), undefined, () => {
70+
setTimeout(() => {
71+
socket.close()
72+
}, 1000)
73+
})
74+
})

src/index.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11

22
import { map, shareReplay } from 'rxjs/operators'
33
import { onvifProbe } from './onvif/onvif-probe'
4+
import { Subject } from 'rxjs'
45

56
export * from './onvif/device'
67

7-
export const onvifProbe$ = onvifProbe().pipe(shareReplay(1))
8+
const end = new Subject()
9+
const end$ = end.asObservable()
10+
11+
export const onvifProbe$ = onvifProbe()(end$).pipe(shareReplay(1))
812
export const onvifDevices$ = onvifProbe$.pipe(map(a => a.map(b => b.device)))
913
export const onvifResponses$ = onvifProbe$.pipe(map(a => a.map(b => b.raw)))
1014

15+
export const terminateProbe = () => end.next()
16+
1117
export const cli = () => {
1218
return onvifDevices$
1319
.subscribe(res => {
@@ -25,7 +31,6 @@ export const cli = () => {
2531
}))
2632
})
2733
}
28-
2934
// interface IReponse {
3035
// devices: [
3136
// {

src/onvif/onvif-probe.spec.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { createSocket } from 'dgram'
22
import { onvifProbe } from './onvif-probe'
3+
import { Subject } from 'rxjs'
34

45
const ipcam = '<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope" xmlns:SOAP-ENC="http://www.w3.org/2003/05/soap-encoding" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsdd="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:vfdis="http://www.onvif.org/ver10/network/wsdl/RemoteDiscoveryBinding" xmlns:vfdis2="http://www.onvif.org/ver10/network/wsdl/DiscoveryLookupBinding" xmlns:tdn="http://www.onvif.org/ver10/network/wsdl"><SOAP-ENV:Header><wsa:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:MessageID><wsa:RelatesTo>uuid:NetworkVideoTransmitter</wsa:RelatesTo><wsa:ReplyTo SOAP-ENV:mustUnderstand="true"><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo><wsa:To SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:To><wsa:Action SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</wsa:Action></SOAP-ENV:Header><SOAP-ENV:Body><wsdd:ProbeMatches><wsdd:ProbeMatch><wsa:EndpointReference><wsa:Address>urn:uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</wsa:Address><wsa:ReferenceProperties></wsa:ReferenceProperties><wsa:ReferenceParameters></wsa:ReferenceParameters><wsa:PortType>ttl</wsa:PortType></wsa:EndpointReference><wsdd:Types>tdn:4655721b-4e0e-4296-ba0b-3180423b5b0c</wsdd:Types><wsdd:Scopes>onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/Model/631GA onvif://www.onvif.org/Name/IPCAM onvif://www.onvif.org/location/country/china</wsdd:Scopes><wsdd:XAddrs>http://192.168.1.1:80/onvif/device_service</wsdd:XAddrs><wsdd:MetadataVersion>1</wsdd:MetadataVersion></wsdd:ProbeMatch></wsdd:ProbeMatches></SOAP-ENV:Body></SOAP-ENV:Envelope>'
56
const amcrest = '<?xml version="1.0" encoding="utf-8" standalone="yes" ?><s:Envelope xmlns:sc="http://www.w3.org/2003/05/soap-encoding" xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:dn="http://www.onvif.org/ver10/network/wsdl" xmlns:tds="http://www.onvif.org/ver10/device/wsdl" xmlns:d="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing"><s:Header><a:MessageID>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:MessageID><a:To>urn:schemas-xmlsoap-org:ws:2005:04:discovery</a:To><a:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</a:Action><a:RelatesTo>uuid:Device</a:RelatesTo></s:Header><s:Body><d:ProbeMatches><d:ProbeMatch><a:EndpointReference><a:Address>uuid:8eceb0ca-564e-4436-bec5-e63ea243c529</a:Address></a:EndpointReference><d:Types>dn:NetworkVideoTransmitter tds:Device</d:Types><d:Scopes>onvif://www.onvif.org/location/country/china onvif://www.onvif.org/name/Amcrest onvif://www.onvif.org/hardware/IP2M-841B onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/type/Network_Video_Transmitter onvif://www.onvif.org/extension/unique_identifier</d:Scopes><d:XAddrs>http://192.168.1.235/onvif/device_service</d:XAddrs><d:MetadataVersion>1</d:MetadataVersion></d:ProbeMatch></d:ProbeMatches></s:Body></s:Envelope>'
@@ -25,9 +26,10 @@ const config = (port: number) => {
2526
describe('onvif-probe', () => {
2627
it('should handle IPCAM - 631GA', done => {
2728
const port = 41241
29+
const end = new Subject()
2830

2931
initTestServer(port)(ipcam)
30-
onvifProbe(config(port))
32+
onvifProbe(config(port))(end)
3133
.subscribe(res => {
3234
expect(res[0].device).toEqual({
3335
name: 'IPCAM',
@@ -46,15 +48,17 @@ describe('onvif-probe', () => {
4648
profiles: ['Streaming'],
4749
xaddrs: ['http://192.168.1.1:80/onvif/device_service']
4850
})
51+
end.next()
4952
done()
5053
})
5154
})
5255

5356
it('should handle AMCREST - IP2M-841B', done => {
5457
const port = 41242
58+
const end = new Subject()
5559

5660
initTestServer(port)(amcrest)
57-
onvifProbe(config(port))
61+
onvifProbe(config(port))(end)
5862
.subscribe(res => {
5963
expect(res[0].device).toEqual({
6064
name: 'Amcrest',
@@ -74,6 +78,7 @@ describe('onvif-probe', () => {
7478
profiles: ['Streaming'],
7579
xaddrs: ['http://192.168.1.235/onvif/device_service']
7680
})
81+
end.next()
7782
done()
7883
})
7984
})

src/onvif/onvif-probe.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ export type IOnvifProbeResponseModels = readonly IOnvifProbeResponseModel[]
1212
export type IOnvifProbeResponse = Observable<IOnvifProbeResponseModels>
1313

1414
export const onvifProbe =
15-
(config?: Partial<IWsProbeConfig>): IOnvifProbeResponse =>
16-
wsProbe(config)
15+
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IOnvifProbeResponse =>
16+
wsProbe(config)(until)
1717
.pipe(map(res => res.map(a => {
1818
return {
1919
...a,

src/upnp/upnp-probe.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { createSocket } from 'dgram'
22
import { upnpProbe } from './upnp-probe'
3+
import { Subject } from 'rxjs'
34

45
const initTestServer = (port: number) => {
56
const server = createSocket('udp4')
@@ -12,8 +13,9 @@ const initTestServer = (port: number) => {
1213
describe.skip('upnp probe', () => {
1314
it.skip('ddddd', done => {
1415
const port = 1900
16+
const end = new Subject()
1517

16-
upnpProbe({ PORTS: [port] })
18+
upnpProbe({ PORTS: [port] })(end)
1719
.subscribe(res => {
1820
console.log(res)
1921
})

src/ws-discovery/ws-probe.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { createSocket } from 'dgram'
22
import { wsProbe } from './ws-probe'
3+
import { Subject } from 'rxjs'
34

45
const wsXmlResponse = '<?xml version="1.0" encoding="UTF-8"?><SOAP-ENV:Envelope xmlns:SOAP-ENV="http://www.w3.org/2003/05/soap-envelope" xmlns:SOAP-ENC="http://www.w3.org/2003/05/soap-encoding" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:wsdd="http://schemas.xmlsoap.org/ws/2005/04/discovery" xmlns:vfdis="http://www.onvif.org/ver10/network/wsdl/RemoteDiscoveryBinding" xmlns:vfdis2="http://www.onvif.org/ver10/network/wsdl/DiscoveryLookupBinding" xmlns:tdn="http://www.onvif.org/ver10/network/wsdl"><SOAP-ENV:Header><wsa:MessageID>uuid:2709d68a-7dc1-61c2-a205-X3018101811662</wsa:MessageID><wsa:RelatesTo>uuid:NetworkVideoTransmitter</wsa:RelatesTo><wsa:ReplyTo SOAP-ENV:mustUnderstand="true"><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo><wsa:To SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:To><wsa:Action SOAP-ENV:mustUnderstand="true">http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches</wsa:Action></SOAP-ENV:Header><SOAP-ENV:Body><wsdd:ProbeMatches><wsdd:ProbeMatch><wsa:EndpointReference><wsa:Address>urn:uuid:2419d68a-2dd2-21b2-a205-X2018101811779</wsa:Address><wsa:ReferenceProperties></wsa:ReferenceProperties><wsa:ReferenceParameters></wsa:ReferenceParameters><wsa:PortType>ttl</wsa:PortType></wsa:EndpointReference><wsdd:Types>tdn:4655721b-4e0e-4296-ba0b-3180423b5b0c</wsdd:Types><wsdd:Scopes>onvif://www.onvif.org/Profile/Streaming onvif://www.onvif.org/Model/631GA onvif://www.onvif.org/Name/IPCAM onvif://www.onvif.org/location/country/china</wsdd:Scopes><wsdd:XAddrs>http://192.168.1.1:80/onvif/device_service</wsdd:XAddrs><wsdd:MetadataVersion>1</wsdd:MetadataVersion></wsdd:ProbeMatch></wsdd:ProbeMatches></SOAP-ENV:Body></SOAP-ENV:Envelope>'
56

@@ -24,14 +25,17 @@ const config = (port: number) => {
2425
describe('ws probe', () => {
2526
it('should probe basic, distinct', done => {
2627
const port = 41251
28+
const end = new Subject()
29+
const end$ = end.asObservable()
2730

2831
initTestServer(port)
29-
wsProbe(config(port))
32+
wsProbe(config(port))(end$)
3033
.subscribe(res => {
3134
const res1 = res[0]
3235
expect(res.length).toEqual(1)
3336
expect(res1).toBeDefined()
3437
expect(res1.doc).toBeDefined()
38+
end.next()
3539
done()
3640
})
3741
})

src/ws-discovery/ws-probe.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import { map } from 'rxjs/operators'
44
import { probe } from '../core/probe'
55
import { IWsProbeConfig, IWsResponse } from './ws-probe.interfaces'
66
import { DEFAULT_WS_PROBE_CONFIG } from './config'
7+
import { Observable } from 'rxjs'
78

89
const mapDeviceStrToPayload = (str: string) => generateWsDiscoveryProbePayload(str)(generateGuid())
910
const mapDevicesToPayloads = (devices: readonly string[]) => devices.map(mapDeviceStrToPayload)
1011

1112
export const wsProbe =
12-
(config?: Partial<IWsProbeConfig>): IWsResponse => {
13+
(config?: Partial<IWsProbeConfig>) => (until: Observable<any>): IWsResponse => {
1314
const cfg = { ...DEFAULT_WS_PROBE_CONFIG, ...config } as IWsProbeConfig
14-
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))
15+
return probe(cfg)(mapDevicesToPayloads(cfg.DEVICES))(until)
1516
.pipe(map(b => {
1617
return b.map(raw => {
1718
return {

0 commit comments

Comments
 (0)