@@ -26,56 +26,65 @@ module.exports = RedisPubSub;
26
26
27
27
RedisPubSub . prototype = Object . create ( PubSub . prototype ) ;
28
28
29
- RedisPubSub . prototype . close = function ( callback ) {
29
+ RedisPubSub . prototype . close = function ( callback ) {
30
30
if ( ! callback ) {
31
- callback = function ( err ) {
31
+ callback = function ( err ) {
32
32
if ( err ) throw err ;
33
33
} ;
34
34
}
35
35
var pubsub = this ;
36
- PubSub . prototype . close . call ( this , function ( err ) {
36
+ PubSub . prototype . close . call ( this , function ( err ) {
37
37
if ( err ) return callback ( err ) ;
38
- pubsub . _close ( ) . then ( function ( ) {
38
+ pubsub . _close ( ) . then ( function ( ) {
39
39
callback ( ) ;
40
40
} , callback ) ;
41
41
} ) ;
42
42
} ;
43
43
44
- RedisPubSub . prototype . _close = function ( ) {
45
- return this . _closing = this . _closing || this . _connect ( ) . then ( Promise . all ( [
46
- close ( this . client ) ,
47
- close ( this . observer )
48
- ] ) ) ;
44
+ RedisPubSub . prototype . _close = function ( ) {
45
+ var pubsub = this ;
46
+
47
+ if ( ! this . _closing ) {
48
+ this . _closing = this . _connect ( )
49
+ . then ( function ( ) {
50
+ return Promise . all ( [
51
+ close ( pubsub . client ) ,
52
+ close ( pubsub . observer )
53
+ ] )
54
+ } ) ;
55
+ }
56
+
57
+ return this . _closing ;
49
58
} ;
50
59
51
- RedisPubSub . prototype . _subscribe = function ( channel , callback ) {
60
+ RedisPubSub . prototype . _subscribe = function ( channel , callback ) {
52
61
var pubsub = this ;
53
62
pubsub . observer
54
- . subscribe ( channel , function ( message ) {
63
+ . subscribe ( channel , function ( message ) {
55
64
var data = JSON . parse ( message ) ;
56
65
pubsub . _emit ( channel , data ) ;
57
66
} )
58
- . then ( function ( ) {
67
+ . then ( function ( ) {
59
68
callback ( ) ;
60
69
} , callback ) ;
61
70
} ;
62
71
63
- RedisPubSub . prototype . _unsubscribe = function ( channel , callback ) {
72
+ RedisPubSub . prototype . _unsubscribe = function ( channel , callback ) {
64
73
this . observer . unsubscribe ( channel )
65
- . then ( function ( ) {
74
+ . then ( function ( ) {
66
75
callback ( ) ;
67
76
} , callback ) ;
68
77
} ;
69
78
70
- RedisPubSub . prototype . _publish = function ( channels , data , callback ) {
79
+ RedisPubSub . prototype . _publish = function ( channels , data , callback ) {
71
80
var message = JSON . stringify ( data ) ;
72
81
var args = [ message ] . concat ( channels ) ;
73
- this . client . eval ( PUBLISH_SCRIPT , { arguments : args } ) . then ( function ( ) {
82
+ this . client . eval ( PUBLISH_SCRIPT , { arguments : args } ) . then ( function ( ) {
74
83
callback ( ) ;
75
84
} , callback ) ;
76
85
} ;
77
86
78
- RedisPubSub . prototype . _connect = function ( ) {
87
+ RedisPubSub . prototype . _connect = function ( ) {
79
88
this . _clientConnection = this . _clientConnection || connect ( this . client ) ;
80
89
this . _observerConnection = this . _observerConnection || connect ( this . observer ) ;
81
90
return Promise . all ( [
@@ -90,7 +99,7 @@ function connect(client) {
90
99
91
100
var PUBLISH_SCRIPT =
92
101
'for i = 2, #ARGV do ' +
93
- 'redis.call("publish", ARGV[i], ARGV[1]) ' +
102
+ 'redis.call("publish", ARGV[i], ARGV[1]) ' +
94
103
'end' ;
95
104
96
105
function close ( client ) {
0 commit comments