1
- import { map , filter , scan , distinctUntilChanged , takeUntil , mapTo , tap } from 'rxjs/operators'
2
- import { ISocketStream , socketStream } from '../core/socket-stream'
3
- import { reader , IResult } from 'typescript-monads'
4
- import { Observable , timer } from 'rxjs'
5
- import { IProbeConfig } from '../config/config.interface'
6
- import { Strings , Numbers } from '../core/interfaces'
1
+ import { createSocket , RemoteInfo } from 'dgram'
2
+ import { Strings , Numbers , IProbeConfig , DEFAULT_PROBE_CONFIG } from './interfaces'
3
+ import { Observable , Observer , fromEvent , timer } from 'rxjs'
4
+ import { first , shareReplay , map , distinctUntilChanged , mapTo , takeWhile , takeUntil , scan } from 'rxjs/operators'
7
5
6
+ type IMessage = readonly [ Buffer , RemoteInfo ]
8
7
type TimestampMessages = readonly TimestampedMessage [ ]
9
8
type StringDictionary = { readonly [ key : string ] : string }
10
9
interface TimestampedMessage { readonly msg : string , readonly ts : number }
11
10
interface BufferPort { readonly buffer : Buffer , readonly port : number , readonly address : string }
12
11
13
- const flattenXml = ( str : string ) => str . replace ( / > \s * / g, '>' ) . replace ( / \s * < / g, '<' )
14
12
const mapStringToBuffer = ( str : string ) => Buffer . from ( str , 'utf8' )
13
+ const flattenXml = ( str : string ) => str . replace ( / > \s * / g, '>' ) . replace ( / \s * < / g, '<' )
15
14
const toArrayOfValues = < T extends StringDictionary > ( source : Observable < T > ) => source . pipe ( map ( a => Object . keys ( a ) . map ( b => a [ b ] ) ) )
16
15
const flattenDocumentStrings = ( source : Observable < Strings > ) => source . pipe ( map ( a => a . map ( flattenXml ) ) )
17
- const filterOkResults = < TOk , TFail > ( source : Observable < IResult < TOk , TFail > > ) => source . pipe ( filter ( a => a . isOk ( ) ) )
18
- const timestamp = < TFail > ( source : Observable < IResult < Buffer , TFail > > ) => source . pipe ( map < IResult < Buffer , TFail > , TimestampedMessage > ( a => ( { msg : a . unwrap ( ) . toString ( ) , ts : Date . now ( ) } ) ) )
16
+ const timestamp = ( source : Observable < Buffer > ) => source . pipe ( map < Buffer , TimestampedMessage > ( a => ( { msg : a . toString ( ) , ts : Date . now ( ) } ) ) )
19
17
const distinctUntilObjectChanged = < T > ( source : Observable < T > ) => source . pipe ( distinctUntilChanged ( ( a , b ) => {
20
18
const keys1 = Object . keys ( a )
21
19
const keys2 = Object . keys ( b )
22
-
23
- return keys1 . length === keys2 . length && keys1 . reduce ( ( acc : boolean , curr ) => {
24
- return acc === false ? false : keys2 . includes ( curr ) as boolean
25
- } , true )
20
+
21
+ return keys1 . length === keys2 . length &&
22
+ keys1 . reduce ( ( acc : boolean , curr ) => acc === false ? false : keys2 . includes ( curr ) as boolean , true )
26
23
} ) )
27
24
28
25
const accumulateFreshMessages =
@@ -42,27 +39,27 @@ export const flattenBuffersWithInfo =
42
39
ports . reduce ( ( acc , port ) =>
43
40
[ ...acc , ...buffers . map ( buffer => ( { buffer, port, address } ) ) ] , [ ] as readonly BufferPort [ ] )
44
41
45
- export const initSocketStream = reader < IProbeConfig , ISocketStream > ( c => socketStream ( 'udp4' , c . PROBE_NETWORK_TIMEOUT_MS , c . distinctFilterFn ) )
46
-
47
42
export const probe =
48
- ( socket : ISocketStream ) =>
49
- ( ports : Numbers ) =>
50
- ( address : string ) =>
51
- ( messagesToSend : Strings = [ ] ) =>
52
- ( mapFn : ( msg : readonly TimestampedMessage [ ] ) => StringDictionary ) =>
53
- reader < IProbeConfig , Observable < Strings > > ( cfg => {
54
- timer ( 0 , cfg . PROBE_SAMPLE_TIME_MS ) . pipe (
55
- mapTo ( flattenBuffersWithInfo ( ports ) ( address ) ( messagesToSend . map ( mapStringToBuffer ) ) ) ,
56
- takeUntil ( socket . close$ ) )
57
- . subscribe ( bfrPorts => bfrPorts . forEach ( mdl => socket . socket . send ( mdl . buffer , 0 , mdl . buffer . length , mdl . port , mdl . address ) ) )
43
+ ( config ?: Partial < IProbeConfig > ) =>
44
+ ( messages : Strings ) : Observable < Strings > =>
45
+ Observable . create ( ( obs : Observer < Strings > ) => {
46
+ const cfg = { ...DEFAULT_PROBE_CONFIG , ...( config || { } ) }
47
+ const socket = createSocket ( { type : 'udp4' } )
48
+ const socketClosed$ = fromEvent < void > ( socket , 'close' ) . pipe ( first ( ) , shareReplay ( 1 ) )
49
+ const socketMessages$ = fromEvent < IMessage > ( socket , 'message' ) . pipe ( map ( a => a [ 0 ] ) , shareReplay ( 1 ) )
50
+
51
+ timer ( 0 , cfg . PROBE_REQUEST_SAMPLE_RATE_MS ) . pipe (
52
+ mapTo ( flattenBuffersWithInfo ( cfg . PORTS ) ( cfg . MULTICAST_ADDRESS ) ( messages . map ( mapStringToBuffer ) ) ) ,
53
+ takeWhile ( ( ) => ! obs . closed ) ,
54
+ takeUntil ( socketClosed$ ) )
55
+ . subscribe ( bfrPorts => bfrPorts . forEach ( mdl => socket . send ( mdl . buffer , 0 , mdl . buffer . length , mdl . port , mdl . address ) ) )
58
56
59
- return socket . messages$ . pipe (
60
- filterOkResults ,
61
- timestamp ,
62
- accumulateFreshMessages ( cfg . FALLOUT_MS ) ,
63
- mapStrToDictionary ( mapFn ) ,
64
- distinctUntilObjectChanged ,
65
- toArrayOfValues ,
66
- flattenDocumentStrings
67
- )
68
- } )
57
+ socketMessages$ . pipe (
58
+ timestamp ,
59
+ accumulateFreshMessages ( cfg . PROBE_RESPONSE_FALLOUT_MS ) ,
60
+ mapStrToDictionary ( cfg . RESULT_DEDUPE_FN ) ,
61
+ distinctUntilObjectChanged ,
62
+ toArrayOfValues ,
63
+ flattenDocumentStrings
64
+ ) . subscribe ( msg => obs . next ( msg ) )
65
+ } )
0 commit comments