diff --git a/service/process.go b/service/process.go index bbdc5f3..945aa1a 100644 --- a/service/process.go +++ b/service/process.go @@ -174,7 +174,7 @@ func (this *service) processIncoming(msg message.Message) error { case *message.DisconnectMessage: // For DISCONNECT message, we should quit - this.sess.Cmsg.SetWillFlag(false) + this.sess.SetWillFlag(false) return errDisconnect default: diff --git a/service/service.go b/service/service.go index cd23342..a3de565 100644 --- a/service/service.go +++ b/service/service.go @@ -235,7 +235,7 @@ func (this *service) stop() { } // Publish will message if WillFlag is set. Server side only. - if !this.client && this.sess.Cmsg.WillFlag() { + if !this.client && this.sess.WillFlag() { glog.Infof("(%s) service/stop: connection unexpectedly closed. Sending Will.", this.cid()) this.onPublish(this.sess.Will) } @@ -246,7 +246,7 @@ func (this *service) stop() { } // Remove the session from session store if it's suppose to be clean session - if this.sess.Cmsg.CleanSession() && this.sessMgr != nil { + if this.sess.CleanSession() && this.sessMgr != nil { this.sessMgr.Del(this.sess.ID()) } diff --git a/sessions/session.go b/sessions/session.go index 386d2f7..1745c47 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -46,7 +46,7 @@ type Session struct { Pingack *Ackqueue // cmsg is the CONNECT message - Cmsg *message.ConnectMessage + cmsg *message.ConnectMessage // Will message to publish if connect is closed unexpectedly Will *message.PublishMessage @@ -201,3 +201,20 @@ func (this *Session) Topics() ([]string, []byte, error) { func (this *Session) ID() string { return string(this.Cmsg.ClientId()) } +func (this *Session) WillFlag() bool { + this.mu.Lock() + defer this.mu.Unlock() + return this.cmsg.WillFlag() +} + +func (this *Session) SetWillFlag(v bool) { + this.mu.Lock() + defer this.mu.Unlock() + this.cmsg.SetWillFlag(v) +} + +func (this *Session) CleanSession() bool { + this.mu.Lock() + defer this.mu.Unlock() + return this.cmsg.CleanSession() +} diff --git a/topics/memtopics.go b/topics/memtopics.go index 3cddfa9..d316e88 100644 --- a/topics/memtopics.go +++ b/topics/memtopics.go @@ -125,8 +125,12 @@ func (this *memTopics) Retained(topic []byte, msgs *[]*message.PublishMessage) e } func (this *memTopics) Close() error { + this.smu.Lock() this.sroot = nil + this.smu.Unlock() + this.rmu.Lock() this.rroot = nil + this.rmu.Unlock() return nil }