4
4
5
5
'use strict' ;
6
6
7
+ const ints = require ( 'buffer-more-ints' )
7
8
var defs = require ( './defs' ) ;
8
9
var constants = defs . constants ;
9
10
var decode = defs . decode ;
10
11
11
- var Bits = require ( '@acuminous/bitsyntax' ) ;
12
-
13
12
module . exports . PROTOCOL_HEADER = "AMQP" + String . fromCharCode ( 0 , 0 , 9 , 1 ) ;
14
13
15
14
/*
@@ -33,74 +32,141 @@ FRAME_HEADER = constants.FRAME_HEADER,
33
32
FRAME_BODY = constants . FRAME_BODY ,
34
33
FRAME_END = constants . FRAME_END ;
35
34
36
- var bodyCons =
37
- Bits . builder ( FRAME_BODY ,
38
- 'channel:16, size:32, payload:size/binary' ,
39
- FRAME_END ) ;
35
+ // expected byte sizes for frame parts
36
+ const TYPE_BYTES = 1
37
+ const CHANNEL_BYTES = 2
38
+ const SIZE_BYTES = 4
39
+ const FRAME_HEADER_BYTES = TYPE_BYTES + CHANNEL_BYTES + SIZE_BYTES
40
+ const FRAME_END_BYTES = 1
41
+
42
+ /**
43
+ * @typedef {{
44
+ * type: number,
45
+ * channel: number,
46
+ * size: number,
47
+ * payload: Buffer,
48
+ * rest: Buffer
49
+ * }} FrameStructure
50
+ */
51
+
52
+ /**
53
+ * This is a polyfill which will read a big int 64 bit as a number.
54
+ * @arg { Buffer } buffer
55
+ * @arg { number } offset
56
+ * @returns { number }
57
+ */
58
+ function readInt64BE ( buffer , offset ) {
59
+ /**
60
+ * We try to use native implementation if available here because
61
+ * buffer-more-ints does not
62
+ */
63
+ if ( typeof Buffer . prototype . readBigInt64BE === 'function' ) {
64
+ return Number ( buffer . readBigInt64BE ( offset ) )
65
+ }
66
+
67
+ return ints . readInt64BE ( buffer , offset )
68
+ }
40
69
41
70
// %%% TESTME possibly better to cons the first bit and write the
42
71
// second directly, in the absence of IO lists
43
- module . exports . makeBodyFrame = function ( channel , payload ) {
44
- return bodyCons ( { channel : channel , size : payload . length , payload : payload } ) ;
45
- } ;
72
+ /**
73
+ * Make a frame header
74
+ * @arg { number } channel
75
+ * @arg { Buffer } payload
76
+ */
77
+ module . exports . makeBodyFrame = function ( channel , payload ) {
78
+ const frameSize = FRAME_HEADER_BYTES + payload . length + FRAME_END_BYTES
79
+
80
+ const frame = Buffer . alloc ( frameSize )
81
+
82
+ let offset = 0
83
+
84
+ offset = frame . writeUInt8 ( FRAME_BODY , offset )
85
+ offset = frame . writeUInt16BE ( channel , offset )
86
+ offset = frame . writeInt32BE ( payload . length , offset )
46
87
47
- var frameHeaderPattern = Bits . matcher ( 'type:8' , 'channel:16' ,
48
- 'size:32' , 'rest/binary' ) ;
88
+ payload . copy ( frame , offset )
89
+ offset += payload . length
49
90
91
+ frame . writeUInt8 ( FRAME_END , offset )
92
+
93
+ return frame
94
+ } ;
95
+
96
+ /**
97
+ * Parse an AMQP frame
98
+ * @arg { Buffer } bin
99
+ * @arg { number } max
100
+ * @returns { FrameStructure | boolean }
101
+ */
50
102
function parseFrame ( bin , max ) {
51
- var fh = frameHeaderPattern ( bin ) ;
52
- if ( fh ) {
53
- var size = fh . size , rest = fh . rest ;
54
- if ( size > max ) {
55
- throw new Error ( 'Frame size exceeds frame max' ) ;
56
- }
57
- else if ( rest . length > size ) {
58
- if ( rest [ size ] !== FRAME_END )
59
- throw new Error ( 'Invalid frame' ) ;
60
-
61
- return {
62
- type : fh . type ,
63
- channel : fh . channel ,
64
- size : size ,
65
- payload : rest . slice ( 0 , size ) ,
66
- rest : rest . slice ( size + 1 )
67
- } ;
68
- }
103
+ if ( bin . length < FRAME_HEADER_BYTES ) {
104
+ return false
69
105
}
70
- return false ;
71
- }
72
106
73
- module . exports . parseFrame = parseFrame ;
107
+ const type = bin . readUInt8 ( 0 )
108
+ const channel = bin . readUInt16BE ( 1 )
109
+ const size = bin . readUInt32BE ( 3 )
74
110
75
- var headerPattern = Bits . matcher ( 'class:16' ,
76
- '_weight:16' ,
77
- 'size:64' ,
78
- 'flagsAndfields/binary' ) ;
111
+ if ( size > max ) {
112
+ throw new Error ( 'Frame size exceeds frame max' ) ;
113
+ }
114
+
115
+ const totalSize = FRAME_HEADER_BYTES + size + FRAME_END_BYTES
116
+
117
+ if ( bin . length < totalSize ) {
118
+ return false
119
+ }
120
+
121
+ const frameEnd = bin . readUInt8 ( FRAME_HEADER_BYTES + size )
122
+
123
+ if ( frameEnd !== FRAME_END ) {
124
+ throw new Error ( 'Invalid frame' )
125
+ }
79
126
80
- var methodPattern = Bits . matcher ( 'id:32, args/binary' ) ;
127
+ return {
128
+ type,
129
+ channel,
130
+ size,
131
+ payload : bin . subarray ( FRAME_HEADER_BYTES , FRAME_HEADER_BYTES + size ) ,
132
+ rest : bin . subarray ( totalSize )
133
+ }
134
+ }
135
+
136
+ module . exports . parseFrame = parseFrame ;
81
137
82
138
var HEARTBEAT = { channel : 0 } ;
83
139
84
- module . exports . decodeFrame = function ( frame ) {
85
- var payload = frame . payload ;
140
+ /**
141
+ * Decode AMQP frame into JS object
142
+ * @param { FrameStructure } frame
143
+ * @returns
144
+ */
145
+ module . exports . decodeFrame = ( frame ) => {
146
+ const payload = frame . payload
147
+ const channel = frame . channel
148
+
86
149
switch ( frame . type ) {
87
- case FRAME_METHOD :
88
- var idAndArgs = methodPattern ( payload ) ;
89
- var id = idAndArgs . id ;
90
- var fields = decode ( id , idAndArgs . args ) ;
91
- return { id : id , channel : frame . channel , fields : fields } ;
92
- case FRAME_HEADER :
93
- var parts = headerPattern ( payload ) ;
94
- var id = parts [ 'class' ] ;
95
- var fields = decode ( id , parts . flagsAndfields ) ;
96
- return { id : id , channel : frame . channel ,
97
- size : parts . size , fields : fields } ;
98
- case FRAME_BODY :
99
- return { channel : frame . channel , content : frame . payload } ;
100
- case FRAME_HEARTBEAT :
101
- return HEARTBEAT ;
102
- default :
103
- throw new Error ( 'Unknown frame type ' + frame . type ) ;
150
+ case FRAME_METHOD : {
151
+ const id = payload . readUInt32BE ( 0 )
152
+ const args = payload . subarray ( 4 )
153
+ const fields = decode ( id , args )
154
+ return { id, channel, fields }
155
+ }
156
+ case FRAME_HEADER : {
157
+ const id = payload . readUInt16BE ( 0 )
158
+ // const weight = payload.readUInt16BE(2)
159
+ const size = readInt64BE ( payload , 4 )
160
+ const flagsAndfields = payload . subarray ( 12 )
161
+ const fields = decode ( id , flagsAndfields )
162
+ return { id, channel, size, fields }
163
+ }
164
+ case FRAME_BODY :
165
+ return { channel, content : payload }
166
+ case FRAME_HEARTBEAT :
167
+ return HEARTBEAT
168
+ default :
169
+ throw new Error ( 'Unknown frame type ' + frame . type )
104
170
}
105
171
}
106
172
0 commit comments