@@ -577,9 +577,10 @@ func (c initConnection) SupportsStreaming() bool {
577
577
}
578
578
579
579
// Connection implements the driver.Connection interface to allow reading and writing wire
580
- // messages and the driver.Expirable interface to allow expiring.
580
+ // messages and the driver.Expirable interface to allow expiring. It wraps an underlying
581
+ // topology.connection to make it more goroutine-safe and nil-safe.
581
582
type Connection struct {
582
- * connection
583
+ connection * connection
583
584
refCount int
584
585
cleanupPoolFn func ()
585
586
@@ -601,7 +602,7 @@ func (c *Connection) WriteWireMessage(ctx context.Context, wm []byte) error {
601
602
if c .connection == nil {
602
603
return ErrConnectionClosed
603
604
}
604
- return c .writeWireMessage (ctx , wm )
605
+ return c .connection . writeWireMessage (ctx , wm )
605
606
}
606
607
607
608
// ReadWireMessage handles reading a wire message from the underlying connection. The dst parameter
@@ -612,7 +613,7 @@ func (c *Connection) ReadWireMessage(ctx context.Context, dst []byte) ([]byte, e
612
613
if c .connection == nil {
613
614
return dst , ErrConnectionClosed
614
615
}
615
- return c .readWireMessage (ctx , dst )
616
+ return c .connection . readWireMessage (ctx , dst )
616
617
}
617
618
618
619
// CompressWireMessage handles compressing the provided wire message using the underlying
@@ -658,7 +659,7 @@ func (c *Connection) Description() description.Server {
658
659
if c .connection == nil {
659
660
return description.Server {}
660
661
}
661
- return c .desc
662
+ return c .connection . desc
662
663
}
663
664
664
665
// Close returns this connection to the connection pool. This method may not closeConnection the underlying
@@ -681,12 +682,12 @@ func (c *Connection) Expire() error {
681
682
return nil
682
683
}
683
684
684
- _ = c .close ()
685
+ _ = c .connection . close ()
685
686
return c .cleanupReferences ()
686
687
}
687
688
688
689
func (c * Connection ) cleanupReferences () error {
689
- err := c .pool .checkIn (c .connection )
690
+ err := c .connection . pool .checkIn (c .connection )
690
691
if c .cleanupPoolFn != nil {
691
692
c .cleanupPoolFn ()
692
693
c .cleanupPoolFn = nil
@@ -711,14 +712,22 @@ func (c *Connection) ID() string {
711
712
if c .connection == nil {
712
713
return "<closed>"
713
714
}
714
- return c .id
715
+ return c .connection .id
716
+ }
717
+
718
+ // ServerConnectionID returns the server connection ID of this connection.
719
+ func (c * Connection ) ServerConnectionID () * int32 {
720
+ if c .connection == nil {
721
+ return nil
722
+ }
723
+ return c .connection .serverConnectionID
715
724
}
716
725
717
726
// Stale returns if the connection is stale.
718
727
func (c * Connection ) Stale () bool {
719
728
c .mu .RLock ()
720
729
defer c .mu .RUnlock ()
721
- return c .pool .stale (c .connection )
730
+ return c .connection . pool .stale (c .connection )
722
731
}
723
732
724
733
// Address returns the address of this connection.
@@ -728,27 +737,27 @@ func (c *Connection) Address() address.Address {
728
737
if c .connection == nil {
729
738
return address .Address ("0.0.0.0" )
730
739
}
731
- return c .addr
740
+ return c .connection . addr
732
741
}
733
742
734
743
// LocalAddress returns the local address of the connection
735
744
func (c * Connection ) LocalAddress () address.Address {
736
745
c .mu .RLock ()
737
746
defer c .mu .RUnlock ()
738
- if c .connection == nil || c .nc == nil {
747
+ if c .connection == nil || c .connection . nc == nil {
739
748
return address .Address ("0.0.0.0" )
740
749
}
741
- return address .Address (c .nc .LocalAddr ().String ())
750
+ return address .Address (c .connection . nc .LocalAddr ().String ())
742
751
}
743
752
744
753
// PinToCursor updates this connection to reflect that it is pinned to a cursor.
745
754
func (c * Connection ) PinToCursor () error {
746
- return c .pin ("cursor" , c .pool .pinConnectionToCursor , c .pool .unpinConnectionFromCursor )
755
+ return c .pin ("cursor" , c .connection . pool .pinConnectionToCursor , c . connection .pool .unpinConnectionFromCursor )
747
756
}
748
757
749
758
// PinToTransaction updates this connection to reflect that it is pinned to a transaction.
750
759
func (c * Connection ) PinToTransaction () error {
751
- return c .pin ("transaction" , c .pool .pinConnectionToTransaction , c .pool .unpinConnectionFromTransaction )
760
+ return c .pin ("transaction" , c .connection . pool .pinConnectionToTransaction , c . connection .pool .unpinConnectionFromTransaction )
752
761
}
753
762
754
763
func (c * Connection ) pin (reason string , updatePoolFn , cleanupPoolFn func ()) error {
0 commit comments