Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Added logic to reset the client SessionID if the client disconnects a…
Browse files Browse the repository at this point in the history
…nd does not reconnect before the session timeout time elapses.
  • Loading branch information
Colin McIntosh authored and Rob Figueiredo committed Aug 18, 2021
1 parent 16f4ee1 commit 5715f4a
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,12 @@ func (c *Conn) sendRequest(
return rq.recvChan, nil
}

func (c *Conn) sessionExpired(d time.Time) bool {
return d.Add(time.Duration(c.sessionTimeoutMs) * time.Millisecond).Before(time.Now())
}

func (c *Conn) loop(ctx context.Context) {
var disconnectTime time.Time
for {
if err := c.connect(); err != nil {
// c.Close() was called
Expand All @@ -430,11 +435,15 @@ func (c *Conn) loop(ctx context.Context) {
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.logger.Printf("authentication failed: %s", err)
c.logger.Printf("authentication expired: %s", err)
c.resetSession()
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.logger.Printf("authentication failed: %s", err)
c.conn.Close()
if !disconnectTime.IsZero() && c.sessionExpired(disconnectTime) {
c.resetSession()
}
case err == nil:
if c.logInfo {
c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
Expand Down Expand Up @@ -483,6 +492,7 @@ func (c *Conn) loop(ctx context.Context) {
}

c.setState(StateDisconnected)
disconnectTime = time.Now()

select {
case <-c.shouldQuit:
Expand Down Expand Up @@ -672,9 +682,6 @@ func (c *Conn) authenticate() error {
return err
}
if r.SessionID == 0 {
atomic.StoreInt64(&c.sessionID, int64(0))
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
Expand All @@ -687,6 +694,12 @@ func (c *Conn) authenticate() error {
return nil
}

func (c *Conn) resetSession() {
atomic.StoreInt64(&c.sessionID, int64(0))
c.passwd = emptyPassword
c.lastZxid = 0
}

func (c *Conn) sendData(req *request) error {
header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(c.buf[4:], header)
Expand Down

0 comments on commit 5715f4a

Please sign in to comment.