diff --git a/event/monitoring.go b/event/monitoring.go index ac05e401cc..15eda1dab1 100644 --- a/event/monitoring.go +++ b/event/monitoring.go @@ -105,6 +105,7 @@ type PoolEvent struct { // ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field // can be used to distinguish between individual servers in a load balanced deployment. ServiceID *primitive.ObjectID `json:"serviceId"` + Error error `json:"error"` } // PoolMonitor is a function that allows the user to gain access to events occurring in the pool diff --git a/internal/logger/component.go b/internal/logger/component.go index 1617026849..642712b05c 100644 --- a/internal/logger/component.go +++ b/internal/logger/component.go @@ -14,9 +14,60 @@ import ( ) const ( - CommandFailed = "Command failed" - CommandStarted = "Command started" - CommandSucceeded = "Command succeeded" + CommandFailed = "Command failed" + CommandStarted = "Command started" + CommandSucceeded = "Command succeeded" + ConnectionPoolCreated = "Connection pool created" + ConnectionPoolReady = "Connection pool ready" + ConnectionPoolCleared = "Connection pool cleared" + ConnectionPoolClosed = "Connection pool closed" + ConnectionCreated = "Connection created" + ConnectionReady = "Connection ready" + ConnectionClosed = "Connection closed" + ConnectionCheckoutStarted = "Connection checkout started" + ConnectionCheckoutFailed = "Connection checkout failed" + ConnectionCheckedOut = "Connection checked out" + ConnectionCheckedIn = "Connection checked in" +) + +const ( + KeyCommand = "command" + KeyCommandName = "commandName" + KeyDatabaseName = "databaseName" + KeyDriverConnectionID = "driverConnectionId" + KeyDurationMS = "durationMS" + KeyError = "error" + KeyFailure = "failure" + KeyMaxConnecting = "maxConnecting" + KeyMaxIdleTimeMS = "maxIdleTimeMS" + KeyMaxPoolSize = "maxPoolSize" + KeyMessage = "message" + KeyMinPoolSize = "minPoolSize" + KeyOperationID = "operationId" + KeyReason = "reason" + KeyReply = "reply" + KeyRequestID = "requestId" + KeyServerConnectionID = "serverConnectionId" + KeyServerHost = "serverHost" + KeyServerPort = "serverPort" + KeyServiceID = "serviceId" + KeyTimestamp = "timestamp" +) + +type KeyValues []interface{} + +func (kvs *KeyValues) Add(key string, value interface{}) { + *kvs = append(*kvs, key, value) +} + +const ( + ReasonConnClosedStale = "Connection became stale because the pool was cleared" + ReasonConnClosedIdle = "Connection has been available but unused for longer than the configured max idle time" + ReasonConnClosedError = "An error occurred while using the connection" + ReasonConnClosedPoolClosed = "Connection pool was closed" + ReasonConnCheckoutFailedTimout = "Wait queue timeout elapsed without a connection becoming available" + ReasonConnCheckoutFailedError = "An error occurred while trying to establish a new connection" + ReasonConnCheckoutFailedPoolClosed = "Connection pool was closed" ) // Component is an enumeration representing the "components" which can be @@ -87,31 +138,62 @@ type Command struct { // structured logging. func SerializeCommand(cmd Command, extraKeysAndValues ...interface{}) []interface{} { // Initialize the boilerplate keys and values. - keysAndValues := append([]interface{}{ - "commandName", cmd.Name, - "driverConnectionId", cmd.DriverConnectionID, - "message", cmd.Message, - "operationId", cmd.OperationID, - "requestId", cmd.RequestID, - "serverHost", cmd.ServerHost, - }, extraKeysAndValues...) + keysAndValues := KeyValues{ + KeyCommandName, cmd.Name, + KeyDriverConnectionID, cmd.DriverConnectionID, + KeyMessage, cmd.Message, + KeyOperationID, cmd.OperationID, + KeyRequestID, cmd.RequestID, + KeyServerHost, cmd.ServerHost, + } + + // Add the extra keys and values. + for i := 0; i < len(extraKeysAndValues); i += 2 { + keysAndValues.Add(extraKeysAndValues[i].(string), extraKeysAndValues[i+1]) + } - // Add the optional keys and values. port, err := strconv.ParseInt(cmd.ServerPort, 0, 32) if err == nil { - keysAndValues = append(keysAndValues, "serverPort", port) + keysAndValues.Add(KeyServerPort, port) } // Add the "serverConnectionId" if it is not nil. if cmd.ServerConnectionID != nil { - keysAndValues = append(keysAndValues, - "serverConnectionId", *cmd.ServerConnectionID) + keysAndValues.Add(KeyServerConnectionID, *cmd.ServerConnectionID) } // Add the "serviceId" if it is not nil. if cmd.ServiceID != nil { - keysAndValues = append(keysAndValues, - "serviceId", cmd.ServiceID.Hex()) + keysAndValues.Add(KeyServiceID, cmd.ServiceID.Hex()) + } + + return keysAndValues +} + +// Connection contains data that all connection log messages MUST contain. +type Connection struct { + Message string // Message associated with the connection + ServerHost string // Hostname or IP address for the server + ServerPort string // Port for the server +} + +// SerializeConnection serializes a ConnectionMessage into a slice of keys +// and values that can be passed to a logger. +func SerializeConnection(conn Connection, extraKeysAndValues ...interface{}) []interface{} { + // Initialize the boilerplate keys and values. + keysAndValues := KeyValues{ + KeyMessage, conn.Message, + KeyServerHost, conn.ServerHost, + } + + // Add the optional keys and values. + for i := 0; i < len(extraKeysAndValues); i += 2 { + keysAndValues.Add(extraKeysAndValues[i].(string), extraKeysAndValues[i+1]) + } + + port, err := strconv.ParseInt(conn.ServerPort, 0, 32) + if err == nil { + keysAndValues.Add(KeyServerPort, port) } return keysAndValues diff --git a/internal/logger/io_sink.go b/internal/logger/io_sink.go index d2ec746d36..4aa8f08b31 100644 --- a/internal/logger/io_sink.go +++ b/internal/logger/io_sink.go @@ -7,91 +7,52 @@ package logger import ( + "encoding/json" "io" - "log" + "sync" + "time" ) -// IOSink writes to an io.Writer using the standard library logging solution and -// is the default sink for the logger, with the default IO being os.Stderr. +// IOSink writes a JSON-encoded message to the io.Writer. type IOSink struct { - log *log.Logger + enc *json.Encoder + + // encMu protects the encoder from concurrent writes. While the logger + // itself does not concurrently write to the sink, the sink may be used + // concurrently within the driver. + encMu sync.Mutex } -// Compile-time check to ensure osSink implements the LogSink interface. +// Compile-time check to ensure IOSink implements the LogSink interface. var _ LogSink = &IOSink{} -// NewIOSink will create a new IOSink that writes to the provided io.Writer. +// NewIOSink will create an IOSink object that writes JSON messages to the +// provided io.Writer. func NewIOSink(out io.Writer) *IOSink { return &IOSink{ - log: log.New(out, "", log.LstdFlags), + enc: json.NewEncoder(out), } } -func logCommandMessageStarted(log *log.Logger, kvMap map[string]interface{}) { - format := "Command %q started on database %q using a connection with " + - "server-generated ID %d to %s:%d. The requestID is %d and " + - "the operation ID is %d. Command: %s" - - log.Printf(format, - kvMap["commandName"], - kvMap["databaseName"], - kvMap["serverConnectionId"], - kvMap["serverHost"], - kvMap["serverPort"], - kvMap["requestId"], - kvMap["operationId"], - kvMap["command"]) - -} - -func logCommandMessageSucceeded(log *log.Logger, kvMap map[string]interface{}) { - format := "Command %q succeeded in %d ms using server-generated ID " + - "%d to %s:%d. The requestID is %d and the operation ID is " + - "%d. Command reply: %s" +// Info will write a JSON-encoded message to the io.Writer. +func (sink *IOSink) Info(_ int, msg string, keysAndValues ...interface{}) { + kvMap := make(map[string]interface{}, len(keysAndValues)/2+2) - log.Printf(format, - kvMap["commandName"], - kvMap["duration"], - kvMap["serverConnectionId"], - kvMap["serverHost"], - kvMap["serverPort"], - kvMap["requestId"], - kvMap["operationId"], - kvMap["reply"]) -} + kvMap[KeyTimestamp] = time.Now().UnixNano() + kvMap[KeyMessage] = msg -func logCommandMessageFailed(log *log.Logger, kvMap map[string]interface{}) { - format := "Command %q failed in %d ms using a connection with " + - "server-generated ID %d to %s:%d. The requestID is %d and " + - "the operation ID is %d. Error: %s" - - log.Printf(format, - kvMap["commandName"], - kvMap["duration"], - kvMap["serverConnectionID"], - kvMap["serverHost"], - kvMap["serverPort"], - kvMap["requestId"], - kvMap["operationId"], - kvMap["failure"]) -} - -func (osSink *IOSink) Info(_ int, msg string, keysAndValues ...interface{}) { - kvMap := make(map[string]interface{}) for i := 0; i < len(keysAndValues); i += 2 { kvMap[keysAndValues[i].(string)] = keysAndValues[i+1] } - switch msg { - case CommandStarted: - logCommandMessageStarted(osSink.log, kvMap) - case CommandSucceeded: - logCommandMessageSucceeded(osSink.log, kvMap) - case CommandFailed: - logCommandMessageFailed(osSink.log, kvMap) - } + sink.encMu.Lock() + defer sink.encMu.Unlock() + + _ = sink.enc.Encode(kvMap) } -func (osSink *IOSink) Error(err error, msg string, kv ...interface{}) { - osSink.Info(0, msg, kv...) +// Error will write a JSON-encoded error message tot he io.Writer. +func (sink *IOSink) Error(err error, msg string, kv ...interface{}) { + kv = append(kv, KeyError, err.Error()) + sink.Info(0, msg, kv...) } diff --git a/internal/logger/logger_test.go b/internal/logger/logger_test.go index dd8a5bcd12..861199364a 100644 --- a/internal/logger/logger_test.go +++ b/internal/logger/logger_test.go @@ -7,8 +7,12 @@ package logger import ( + "bytes" + "encoding/json" + "fmt" "os" "reflect" + "sync" "testing" ) @@ -33,12 +37,83 @@ func BenchmarkLogger(b *testing.B) { b.Fatal(err) } - for i := 0; i < b.N; i++ { - logger.Print(LevelInfo, ComponentCommand, "foo", "bar", "baz") + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + logger.Print(LevelInfo, ComponentCommand, "foo", "bar", "baz") + } + }) + }) +} + +func mockKeyValues(length int) (KeyValues, map[string]interface{}) { + keysAndValues := KeyValues{} + m := map[string]interface{}{} + + for i := 0; i < length; i++ { + keyName := fmt.Sprintf("key%d", i) + valueName := fmt.Sprintf("value%d", i) + + keysAndValues.Add(keyName, valueName) + m[keyName] = valueName + } + + return keysAndValues, m +} + +func BenchmarkIOSinkInfo(b *testing.B) { + keysAndValues, _ := mockKeyValues(10) + + b.ReportAllocs() + b.ResetTimer() + + sink := NewIOSink(bytes.NewBuffer(nil)) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sink.Info(0, "foo", keysAndValues...) } }) } +func TestIOSinkInfo(t *testing.T) { + t.Parallel() + + const threshold = 1000 + + mockKeyValues, kvmap := mockKeyValues(10) + + buf := new(bytes.Buffer) + sink := NewIOSink(buf) + + wg := sync.WaitGroup{} + wg.Add(threshold) + + for i := 0; i < threshold; i++ { + go func() { + defer wg.Done() + + sink.Info(0, "foo", mockKeyValues...) + }() + } + + wg.Wait() + + dec := json.NewDecoder(buf) + for dec.More() { + var m map[string]interface{} + if err := dec.Decode(&m); err != nil { + t.Fatalf("error unmarshaling JSON: %v", err) + } + + delete(m, KeyTimestamp) + delete(m, KeyMessage) + + if !reflect.DeepEqual(m, kvmap) { + t.Fatalf("expected %v, got %v", kvmap, m) + } + } +} + func TestSelectMaxDocumentLength(t *testing.T) { t.Parallel() diff --git a/mongo/integration/unified/client_entity.go b/mongo/integration/unified/client_entity.go index 99d32cec2a..a32203daca 100644 --- a/mongo/integration/unified/client_entity.go +++ b/mongo/integration/unified/client_entity.go @@ -40,6 +40,7 @@ var securitySensitiveCommands = []string{"authenticate", "saslStart", "saslConti // execution. type clientEntity struct { *mongo.Client + disconnected bool recordEvents atomic.Value started []*event.CommandStartedEvent @@ -199,6 +200,25 @@ func getURIForClient(opts *entityOptions) string { } } +// disconnect disconnects the client associated with this entity. It is an +// idempotent operation, unlike the mongo client's disconnect method. This +// property will help avoid unnecessary errors when calling disconnect on a +// client that has already been disconnected, such as the case when the test +// runner is required to run the closure as part of an operation. +func (c *clientEntity) disconnect(ctx context.Context) error { + if c.disconnected { + return nil + } + + if err := c.Client.Disconnect(ctx); err != nil { + return err + } + + c.disconnected = true + + return nil +} + func (c *clientEntity) stopListeningForEvents() { c.setRecordEvents(false) } @@ -458,8 +478,14 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond) case "loadBalanced": clientOpts.SetLoadBalanced(value.(bool)) + case "maxIdleTimeMS": + clientOpts.SetMaxConnIdleTime(time.Duration(value.(int32)) * time.Millisecond) + case "minPoolSize": + clientOpts.SetMinPoolSize(uint64(value.(int32))) case "maxPoolSize": clientOpts.SetMaxPoolSize(uint64(value.(int32))) + case "maxConnecting": + clientOpts.SetMaxConnecting(uint64(value.(int32))) case "readConcernLevel": clientOpts.SetReadConcern(readconcern.New(readconcern.Level(value.(string)))) case "retryReads": @@ -473,6 +499,8 @@ func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts b wcSet = true case "waitQueueTimeoutMS": return newSkipTestError("the waitQueueTimeoutMS client option is not supported") + case "waitQueueSize": + return newSkipTestError("the waitQueueSize client option is not supported") case "timeoutMS": clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond) default: diff --git a/mongo/integration/unified/cursor_operation_execution.go b/mongo/integration/unified/cursor_operation_execution.go index 06777660e2..390e844ad0 100644 --- a/mongo/integration/unified/cursor_operation_execution.go +++ b/mongo/integration/unified/cursor_operation_execution.go @@ -13,17 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -func executeClose(ctx context.Context, operation *operation) error { - cursor, err := entities(ctx).cursor(operation.Object) - if err != nil { - return err - } - - // Per the spec, we ignore all errors from Close. - _ = cursor.Close(ctx) - return nil -} - func executeIterateOnce(ctx context.Context, operation *operation) (*operationResult, error) { cursor, err := entities(ctx).cursor(operation.Object) if err != nil { diff --git a/mongo/integration/unified/entity.go b/mongo/integration/unified/entity.go index e9b6f6e379..f3de52bd06 100644 --- a/mongo/integration/unified/entity.go +++ b/mongo/integration/unified/entity.go @@ -23,8 +23,10 @@ import ( "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) -// ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open -var ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open") +var ( + // ErrEntityMapOpen is returned when a slice entity is accessed while the EntityMap is open + ErrEntityMapOpen = errors.New("slices cannot be accessed while EntityMap is open") +) var ( tlsCAFile = os.Getenv("CSFLE_TLS_CA_FILE") @@ -403,7 +405,8 @@ func (em *EntityMap) close(ctx context.Context) []error { // Client will be closed in clientEncryption.Close() continue } - if err := client.Disconnect(ctx); err != nil { + + if err := client.disconnect(ctx); err != nil { errs = append(errs, fmt.Errorf("error closing client with ID %q: %v", id, err)) } } diff --git a/mongo/integration/unified/logger_verification.go b/mongo/integration/unified/logger_verification.go index 19823d4e99..3d4b027e97 100644 --- a/mongo/integration/unified/logger_verification.go +++ b/mongo/integration/unified/logger_verification.go @@ -9,6 +9,7 @@ package unified import ( "context" "fmt" + "sync" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/internal/logger" @@ -74,24 +75,6 @@ func newLogMessage(level int, args ...interface{}) (*logMessage, error) { return logMessage, nil } -// validate will validate the expectedLogMessage and return an error if it is -// invalid. -func validateLogMessage(message *logMessage) error { - if message.LevelLiteral == "" { - return fmt.Errorf("%w: level is required", errLoggerVerification) - } - - if message.ComponentLiteral == "" { - return fmt.Errorf("%w: component is required", errLoggerVerification) - } - - if message.Data == nil { - return fmt.Errorf("%w: data is required", errLoggerVerification) - } - - return nil -} - // clientLogMessages is a struct representing the expected "LogMessages" for a // client. type clientLogMessages struct { @@ -99,123 +82,39 @@ type clientLogMessages struct { LogMessages []*logMessage `bson:"messages"` } -// validateClientLogMessages will validate a single "clientLogMessages" object -// and return an error if it is invalid, i.e. not testable. -func validateClientLogMessages(log *clientLogMessages) error { - if log.Client == "" { - return fmt.Errorf("%w: client is required", errLoggerVerification) - } - - if len(log.LogMessages) == 0 { - return fmt.Errorf("%w: log messages are required", errLoggerVerification) - } - - for _, message := range log.LogMessages { - if err := validateLogMessage(message); err != nil { - return fmt.Errorf("%w: message is invalid: %v", errLoggerVerification, err) - } - } - - return nil -} - -// validateExpectLogMessages will validate a slice of "clientLogMessages" -// objects and return the first error encountered. -func validateExpectLogMessages(logs []*clientLogMessages) error { - seenClientNames := make(map[string]struct{}) // Check for client duplication - - for _, log := range logs { - if err := validateClientLogMessages(log); err != nil { - return fmt.Errorf("%w: client is invalid: %v", errLoggerVerification, err) - } - - if _, ok := seenClientNames[log.Client]; ok { - return fmt.Errorf("%w: duplicate client: %v", errLoggerVerification, log.Client) - } - - seenClientNames[log.Client] = struct{}{} - } - - return nil -} - // logMessageValidator defines the expectation for log messages across all // clients. type logMessageValidator struct { - testCase *TestCase - err map[string]chan error - expectedLogMessageCount int + testCase *TestCase + clientErrs map[string]chan error } -// newLogMessageValidator will create a new "logMessageValidator" from a test -// case. -func newLogMessageValidator(testCase *TestCase) (*logMessageValidator, error) { - if testCase == nil { - return nil, fmt.Errorf("%w: test case is required", errLoggerVerification) - } - - if testCase.entities == nil { - return nil, fmt.Errorf("%w: entities are required", errLoggerVerification) - } - +// newLogMessageValidator will create a new logMessageValidator. +func newLogMessageValidator(testCase *TestCase) *logMessageValidator { validator := &logMessageValidator{testCase: testCase} - validator.err = make(map[string]chan error) + validator.clientErrs = make(map[string]chan error) - for _, elm := range testCase.ExpectLogMessages { - validator.err[elm.Client] = make(chan error, 1) - validator.expectedLogMessageCount += len(elm.LogMessages) + // Make the error channels for the clients. + for _, exp := range testCase.ExpectLogMessages { + validator.clientErrs[exp.Client] = make(chan error) } - return validator, nil + return validator } -type actualLogQueues map[string]chan orderedLogMessage - -func (validator *logMessageValidator) expected(ctx context.Context) ([]*clientLogMessages, actualLogQueues) { +func logQueue(ctx context.Context, exp *clientLogMessages) <-chan orderedLogMessage { clients := entities(ctx).clients() - expected := make([]*clientLogMessages, 0, len(validator.testCase.ExpectLogMessages)) - actual := make(actualLogQueues, len(clients)) - - for _, clientLogMessages := range validator.testCase.ExpectLogMessages { - clientName := clientLogMessages.Client - - clientEntity, ok := clients[clientName] - if !ok { - continue // If there is no entity for the client, skip it. - } - - expected = append(expected, clientLogMessages) - actual[clientName] = clientEntity.logQueue - } - - return expected, actual -} - -// stopLogMessageVerificationWorkers will gracefully validate all log messages -// received by all clients and return the first error encountered. -func stopLogMessageVerificationWorkers(ctx context.Context, validator *logMessageValidator) error { - // Listen for each client's error, if any. If the context deadtline is - // exceeded, return an error. - for clientName, errChan := range validator.err { - select { - case err := <-errChan: - if err != nil { - return fmt.Errorf("%w: client %q: %v", - errLoggerVerification, clientName, err) - } - case <-ctx.Done(): - return fmt.Errorf("%w: context error: %v", - errLoggerVerification, ctx.Err()) - } + clientEntity, ok := clients[exp.Client] + if !ok { + return nil } - return nil + return clientEntity.logQueue } -// verifyLogMessagesMatch will verify that the actual log messages match the -// expected log messages. -func verifyLogMessagesMatch(ctx context.Context, exp, act *logMessage) error { +// verifyLogMatch will verify that the actual log match the expected log. +func verifyLogMatch(ctx context.Context, exp, act *logMessage) error { if act == nil && exp == nil { return nil } @@ -247,37 +146,185 @@ func verifyLogMessagesMatch(ctx context.Context, exp, act *logMessage) error { return nil } -func (validator *logMessageValidator) validate(ctx context.Context, exp *clientLogMessages, - queue <-chan orderedLogMessage) { - for actual := range queue { - actMsg := actual.logMessage - expMsg := exp.LogMessages[actual.order-2] +// isUnorderedLog will return true if the log is/should be unordered in the Go +// Driver. +func isUnorderedLog(log *logMessage) bool { + msg, err := log.Data.LookupErr(logger.KeyMessage) + if err != nil { + return false + } + + msgStr := msg.StringValue() + + // There is a race condition in the connection pool's workflow where it + // is non-deterministic whether the connection pool will fail a checkout + // or close a connection first. Because of this, either log may be + // received in any order. To account for this behavior, we considered + // both logs to be "unordered". + return msgStr == logger.ConnectionCheckoutFailed || + msgStr == logger.ConnectionClosed +} - if expMsg == nil { - continue +type logQueues struct { + expected *clientLogMessages + ordered <-chan *logMessage + unordered <-chan *logMessage +} + +func partitionLogQueue(ctx context.Context, exp *clientLogMessages) logQueues { + orderedLogCh := make(chan *logMessage, len(exp.LogMessages)) + unorderedLogCh := make(chan *logMessage, len(exp.LogMessages)) + + // Get the unordered indices from the expected log messages. + unorderedIndices := make(map[int]struct{}) + for i, log := range exp.LogMessages { + if isUnorderedLog(log) { + unorderedIndices[i] = struct{}{} + } + } + + go func() { + defer close(orderedLogCh) + defer close(unorderedLogCh) + + for actual := range logQueue(ctx, exp) { + msg := actual.logMessage + if _, ok := unorderedIndices[actual.order-2]; ok { + unorderedLogCh <- msg + } else { + orderedLogCh <- msg + } } + }() + + return logQueues{ + expected: exp, + ordered: orderedLogCh, + unordered: unorderedLogCh, + } +} - err := verifyLogMessagesMatch(ctx, expMsg, actMsg) - if err != nil { - validator.err[exp.Client] <- fmt.Errorf( - "%w: for client %q on message %d: %v", - errLoggerVerification, exp.Client, actual.order, err) +func matchOrderedLogs(ctx context.Context, logs logQueues) <-chan error { + // Remove all of the unordered log messages from the expected. + expLogMessages := make([]*logMessage, 0, len(logs.expected.LogMessages)) + for _, log := range logs.expected.LogMessages { + if !isUnorderedLog(log) { + expLogMessages = append(expLogMessages, log) } } - close(validator.err[exp.Client]) + errs := make(chan error, 1) + + go func() { + defer close(errs) + + for actual := range logs.ordered { + expected := expLogMessages[0] + if expected == nil { + continue + } + + err := verifyLogMatch(ctx, expected, actual) + if err != nil { + errs <- err + } + + // Remove the first element from the expected log. + expLogMessages = expLogMessages[1:] + } + }() + + return errs } -// startLogMessageVerificationWorkers will start a goroutine for each client's -// expected log messages, listening to the channel of actual log messages and -// comparing them to the expected log messages. -func startLogMessageVerificationWorkers(ctx context.Context, validator *logMessageValidator) { - expected, actual := validator.expected(ctx) - for _, expected := range expected { - if expected == nil { - continue +func matchUnorderedLogs(ctx context.Context, logs logQueues) <-chan error { + unordered := make(map[*logMessage]struct{}, len(logs.expected.LogMessages)) + + for _, log := range logs.expected.LogMessages { + if isUnorderedLog(log) { + unordered[log] = struct{}{} + } + } + + errs := make(chan error, 1) + + go func() { + defer close(errs) + + for actual := range logs.unordered { + var err error + + // Iterate over the unordered log messages and verify + // that at least one of them matches the actual log + // message. + for expected := range unordered { + err = verifyLogMatch(ctx, expected, actual) + if err == nil { + // Remove the matched unordered log + // message from the unordered map. + delete(unordered, expected) + + break + } + } + + // If there as no match, return an error. + if err != nil { + errs <- err + } } + }() + + return errs +} + +// startLogValidators will start a goroutine for each client's expected log +// messages, listening to the channel of actual log messages and comparing them +// to the expected log messages. +func startLogValidators(ctx context.Context, validator *logMessageValidator) { + for _, expected := range validator.testCase.ExpectLogMessages { + logs := partitionLogQueue(ctx, expected) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func(expected *clientLogMessages) { + defer wg.Done() + + validator.clientErrs[expected.Client] <- <-matchOrderedLogs(ctx, logs) + }(expected) + + go func(expected *clientLogMessages) { + defer wg.Done() + + validator.clientErrs[expected.Client] <- <-matchUnorderedLogs(ctx, logs) + }(expected) + + go func(expected *clientLogMessages) { + wg.Wait() - go validator.validate(ctx, expected, actual[expected.Client]) + close(validator.clientErrs[expected.Client]) + }(expected) } } + +func stopLogValidatorsErr(clientName string, err error) error { + return fmt.Errorf("%w: %s: %v", errLoggerVerification, clientName, err) +} + +// stopLogValidators will gracefully validate all log messages received by all +// clients and return the first error encountered. +func stopLogValidators(ctx context.Context, validator *logMessageValidator) error { + for clientName, errChan := range validator.clientErrs { + select { + case err := <-errChan: + if err != nil { + return stopLogValidatorsErr(clientName, err) + } + case <-ctx.Done(): + return stopLogValidatorsErr(clientName, ctx.Err()) + } + } + + return nil +} diff --git a/mongo/integration/unified/operation.go b/mongo/integration/unified/operation.go index a0808c10bd..4034543e14 100644 --- a/mongo/integration/unified/operation.go +++ b/mongo/integration/unified/operation.go @@ -208,7 +208,19 @@ func (op *operation) run(ctx context.Context, loopDone <-chan struct{}) (*operat // Cursor operations case "close": - return newEmptyResult(), executeClose(ctx, op) + if cursor, err := entities(ctx).cursor(op.Object); err == nil { + _ = cursor.Close(ctx) + + return newEmptyResult(), nil + } + + if clientEntity, err := entities(ctx).client(op.Object); err == nil { + _ = clientEntity.disconnect(context.Background()) + + return newEmptyResult(), nil + } + + return nil, fmt.Errorf("failed to find a cursor or client named %q", op.Object) case "iterateOnce": return executeIterateOnce(ctx, op) case "iterateUntilDocumentOrError": diff --git a/mongo/integration/unified/unified_spec_runner.go b/mongo/integration/unified/unified_spec_runner.go index d7358e1df6..80e7b8f4c4 100644 --- a/mongo/integration/unified/unified_spec_runner.go +++ b/mongo/integration/unified/unified_spec_runner.go @@ -218,10 +218,6 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { return fmt.Errorf("schema version %q not supported: %v", tc.schemaVersion, err) } - // Validate the ExpectLogMessages. - if err := validateExpectLogMessages(tc.ExpectLogMessages); err != nil { - return fmt.Errorf("invalid ExpectLogMessages: %v", err) - } // Count the number of expected log messages over all clients. expectedLogCount := 0 for _, clientLog := range tc.ExpectLogMessages { @@ -305,14 +301,10 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { } } - // Create a validator for log messages and start the workers that will observe log messages as they occur - // operationally. - logMessageValidator, err := newLogMessageValidator(tc) - if err != nil { - return fmt.Errorf("error creating logMessageValidator: %v", err) - } - - go startLogMessageVerificationWorkers(testCtx, logMessageValidator) + // Create a validator for log messages and start the workers that will + // observe log messages as they occur operationally. + logMessageValidator := newLogMessageValidator(tc) + go startLogValidators(testCtx, logMessageValidator) for _, client := range tc.entities.clients() { client.stopListeningForEvents() @@ -346,7 +338,7 @@ func (tc *TestCase) Run(ls LoggerSkipper) error { // For each client, verify that all expected log messages were // received. - if err := stopLogMessageVerificationWorkers(ctx, logMessageValidator); err != nil { + if err := stopLogValidators(ctx, logMessageValidator); err != nil { return fmt.Errorf("error verifying log messages: %w", err) } } diff --git a/mongo/integration/unified/unified_spec_test.go b/mongo/integration/unified/unified_spec_test.go index 327a43d358..edfa481255 100644 --- a/mongo/integration/unified/unified_spec_test.go +++ b/mongo/integration/unified/unified_spec_test.go @@ -23,6 +23,7 @@ var ( "collection-management", "command-monitoring", "command-monitoring/logging", + "connection-monitoring-and-pooling/logging", "sessions", "retryable-writes/unified", "client-side-encryption/unified", diff --git a/examples/_logger/custom/main.go b/mongo/options/example_test.go similarity index 61% rename from examples/_logger/custom/main.go rename to mongo/options/example_test.go index 52fa459dfa..66c8f58b19 100644 --- a/examples/_logger/custom/main.go +++ b/mongo/options/example_test.go @@ -4,34 +4,42 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -//go:build logrus - -package main +package options_test import ( + "bytes" "context" "fmt" "io" "log" - "os" + "sync" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) -type CustomLogger struct{ io.Writer } +type CustomLogger struct { + io.Writer + mu sync.Mutex +} + +func (logger *CustomLogger) Info(level int, msg string, _ ...interface{}) { + logger.mu.Lock() + defer logger.mu.Unlock() -func (logger CustomLogger) Info(level int, msg string, keysAndValues ...interface{}) { - logger.Write([]byte(fmt.Sprintf("level=%d msg=%s keysAndValues=%v", level, msg, keysAndValues))) + fmt.Fprintf(logger, "level=%d msg=%s\n", level, msg) } -func (logger CustomLogger) Error(err error, msg string, keysAndValues ...interface{}) { - logger.Write([]byte(fmt.Sprintf("err=%v msg=%s keysAndValues=%v", err, msg, keysAndValues))) +func (logger *CustomLogger) Error(err error, msg string, _ ...interface{}) { + logger.mu.Lock() + defer logger.mu.Unlock() + + fmt.Fprintf(logger, "err=%v msg=%s\n", err, msg) } -func main() { - sink := CustomLogger{os.Stdout} +func ExampleClientOptions_SetLoggerOptions_customLogger() { + buf := bytes.NewBuffer(nil) + sink := &CustomLogger{Writer: buf} // Create a client with our logger options. loggerOptions := options. @@ -46,6 +54,7 @@ func main() { SetLoggerOptions(loggerOptions) client, err := mongo.Connect(context.TODO(), clientOptions) + if err != nil { log.Fatalf("error connecting to MongoDB: %v", err) } @@ -55,8 +64,11 @@ func main() { // Make a database request to test our logging solution. coll := client.Database("test").Collection("test") - _, err = coll.InsertOne(context.TODO(), bson.D{{"Alice", "123"}}) + _, err = coll.InsertOne(context.TODO(), map[string]string{"foo": "bar"}) if err != nil { log.Fatalf("InsertOne failed: %v", err) } + + // Print the logs. + fmt.Println(buf.String()) } diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-logging.json b/testdata/connection-monitoring-and-pooling/logging/connection-logging.json new file mode 100644 index 0000000000..86d4357420 --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/logging/connection-logging.json @@ -0,0 +1,497 @@ +{ + "description": "connection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "Create a client, run a command, and close the client", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "connection": "debug" + } + } + } + ] + } + }, + { + "name": "listDatabases", + "object": "client", + "arguments": { + "filter": {} + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection closed", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "Connection pool was closed" + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool closed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Connection checkout fails due to error establishing connection", + "runOnRequirements": [ + { + "auth": true, + "minServerVersion": "4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryReads": false, + "appname": "clientAppName", + "heartbeatFrequencyMS": 10000 + }, + "observeLogMessages": { + "connection": "debug" + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "saslContinue" + ], + "closeConnection": true, + "appName": "clientAppName" + } + } + } + }, + { + "name": "listDatabases", + "object": "client", + "arguments": { + "filter": {} + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool cleared", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "unordered": true, + "data": { + "message": "Connection closed", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while using the connection", + "error": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "connection", + "unordered": true, + "data": { + "message": "Connection checkout failed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while trying to establish a new connection", + "error": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml b/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml new file mode 100644 index 0000000000..ef5576d753 --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/logging/connection-logging.yml @@ -0,0 +1,222 @@ +description: "connection-logging" + +schemaVersion: "1.13" + +runOnRequirements: + - topologies: + - single # The number of log messages is different for each topology since there is a connection pool per host. + +createEntities: + - client: + id: &failPointClient failPointClient + +tests: + - description: "Create a client, run a command, and close the client" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + observeLogMessages: + connection: debug + - name: listDatabases + object: *client + arguments: + filter: {} + - name: close + object: *client + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection ready" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # The next three expected logs are for ending a session. + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection closed" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "Connection pool was closed" + + - level: debug + component: connection + data: + message: "Connection pool closed" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # This test exists to provide coverage of checkout failed and pool cleared events. + - description: "Connection checkout fails due to error establishing connection" + runOnRequirements: + - auth: true + minServerVersion: "4.0" # failCommand was added to mongod in 4.0 + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + retryReads: false + appname: &clientAppName clientAppName + # use a high heartbeatFrequencyMS to avoid a successful monitor check marking the pool as + # ready (and emitting another event) during the course of test execution. + heartbeatFrequencyMS: 10000 + observeLogMessages: + connection: debug + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["saslContinue"] + closeConnection: true + appName: *clientAppName + - name: listDatabases + object: *client + arguments: + filter: {} + expectError: + isClientError: true + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool cleared" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection closed" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "An error occurred while using the connection" + error: { $$exists: true } + unordered: true + + - level: debug + component: connection + data: + message: "Connection checkout failed" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "An error occurred while trying to establish a new connection" + error: { $$exists: true } + unordered: true diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json new file mode 100644 index 0000000000..e67804915c --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.json @@ -0,0 +1,451 @@ +{ + "description": "connection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "tests": [ + { + "description": "Options should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "connectionReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "minPoolSize": 1, + "maxPoolSize": 5, + "maxIdleTimeMS": 10000 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "connectionReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "minPoolSize": 1, + "maxPoolSize": 5, + "maxIdleTimeMS": 10000 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "maxConnecting should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "maxConnecting": 5 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "maxConnecting": 5 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueTimeoutMS should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueTimeoutMS": 10000 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueTimeoutMS": 10000 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueSize should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueSize": 100 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueSize": 100 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueMultiple should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueSize": 5 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueMultiple": 5 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + } + ] +} diff --git a/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml new file mode 100644 index 0000000000..b22693a92b --- /dev/null +++ b/testdata/connection-monitoring-and-pooling/logging/connection-pool-options.yml @@ -0,0 +1,253 @@ +description: "connection-logging" + +schemaVersion: "1.13" + +runOnRequirements: + - topologies: + - single # The number of log messages is different for each topology since there is a connection pool per host. + +tests: + - description: "Options should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait on a connection ready event for the connection created in the background. + # This is to avoid raciness around whether the background thread has created the connection + # (and whether corresponding log messages have been generated) by the time log message assertions + # are made. + observeEvents: + - connectionReadyEvent + observeLogMessages: + connection: debug + uriOptions: + minPoolSize: 1 + maxPoolSize: 5 + maxIdleTimeMS: 10000 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + connectionReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + minPoolSize: 1 + maxPoolSize: 5 + maxIdleTimeMS: 10000 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection ready" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers who have not done DRIVERS-1943 will need to skip this test. + - description: "maxConnecting should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + maxConnecting: 5 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + maxConnecting: 5 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueTimeoutMS will need to skip this test. + - description: "waitQueueTimeoutMS should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueTimeoutMS: 10000 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueTimeoutMS: 10000 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueSize will need to skip this test. + - description: "waitQueueSize should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueSize: 100 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueSize: 100 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueMultiple will need to skip this test. + - description: "waitQueueMultiple should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueSize: 5 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueMultiple: 5 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index 948ec254d8..27f97401ab 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -1803,8 +1803,9 @@ func (op Operation) publishStartedEvent(ctx context.Context, info startedInforma ServerPort: port, ServiceID: info.serviceID, }, - "command", formattedCmd, - "databaseName", op.Database)...) + logger.KeyCommand, formattedCmd, + logger.KeyDriverConnectionID, info.connID, + logger.KeyDatabaseName, op.Database)...) } @@ -1854,8 +1855,9 @@ func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInfor ServerPort: port, ServiceID: info.serviceID, }, - "durationMS", info.duration.Milliseconds(), - "reply", formattedReply)...) + logger.KeyDurationMS, info.duration.Milliseconds(), + logger.KeyDriverConnectionID, info.connID, + logger.KeyReply, formattedReply)...) } if op.canLogCommandMessage() && !info.success() { @@ -1875,8 +1877,9 @@ func (op Operation) publishFinishedEvent(ctx context.Context, info finishedInfor ServerPort: port, ServiceID: info.serviceID, }, - "durationMS", info.duration.Milliseconds(), - "failure", formattedReply)...) + logger.KeyDurationMS, info.duration.Milliseconds(), + logger.KeyDriverConnectionID, info.connID, + logger.KeyFailure, formattedReply)...) } // If the finished event cannot be published, return early. diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 799c0a3484..fabb485601 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -9,12 +9,14 @@ package topology import ( "context" "fmt" + "net" "sync" "sync/atomic" "time" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/event" + "go.mongodb.org/mongo-driver/internal/logger" "go.mongodb.org/mongo-driver/mongo/address" "go.mongodb.org/mongo-driver/x/mongo/driver" ) @@ -73,6 +75,7 @@ type poolConfig struct { MaxIdleTime time.Duration MaintainInterval time.Duration PoolMonitor *event.PoolMonitor + Logger *logger.Logger handshakeErrFn func(error, uint64, *primitive.ObjectID) } @@ -91,6 +94,7 @@ type pool struct { maxSize uint64 maxConnecting uint64 monitor *event.PoolMonitor + logger *logger.Logger // handshakeErrFn is used to handle any errors that happen during connection establishment and // handshaking. @@ -129,18 +133,56 @@ func (p *pool) getState() int { return p.state } +func mustLogPoolMessage(pool *pool) bool { + return pool.logger != nil && pool.logger.LevelComponentEnabled( + logger.LevelDebug, logger.ComponentConnection) +} + +func logPoolMessage(pool *pool, msg string, keysAndValues ...interface{}) { + host, port, err := net.SplitHostPort(pool.address.String()) + if err != nil { + host = pool.address.String() + port = "" + } + + pool.logger.Print(logger.LevelDebug, + logger.ComponentConnection, + msg, + logger.SerializeConnection(logger.Connection{ + Message: msg, + ServerHost: host, + ServerPort: port, + }, keysAndValues...)...) + +} + +type reason struct { + loggerConn string + event string +} + // connectionPerished checks if a given connection is perished and should be removed from the pool. -func connectionPerished(conn *connection) (string, bool) { +func connectionPerished(conn *connection) (reason, bool) { switch { case conn.closed(): // A connection would only be closed if it encountered a network error during an operation and closed itself. - return event.ReasonError, true + return reason{ + loggerConn: logger.ReasonConnClosedError, + event: event.ReasonError, + }, true case conn.idleTimeoutExpired(): - return event.ReasonIdle, true + return reason{ + loggerConn: logger.ReasonConnClosedIdle, + event: event.ReasonIdle, + }, true case conn.pool.stale(conn): - return event.ReasonStale, true + return reason{ + loggerConn: logger.ReasonConnClosedStale, + event: event.ReasonStale, + }, true } - return "", false + + return reason{}, false } // newPool creates a new pool. It will use the provided options when creating connections. @@ -165,6 +207,7 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool { maxSize: config.MaxPoolSize, maxConnecting: maxConnecting, monitor: config.PoolMonitor, + logger: config.Logger, handshakeErrFn: config.handshakeErrFn, connOpts: connOpts, generation: newPoolGenerationMap(), @@ -202,6 +245,17 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool { go pool.maintain(ctx, pool.backgroundDone) } + if mustLogPoolMessage(pool) { + keysAndValues := logger.KeyValues{ + logger.KeyMaxIdleTimeMS, config.MaxIdleTime.Milliseconds(), + logger.KeyMinPoolSize, config.MinPoolSize, + logger.KeyMaxPoolSize, config.MaxPoolSize, + logger.KeyMaxConnecting, config.MaxConnecting, + } + + logPoolMessage(pool, logger.ConnectionPoolCreated, keysAndValues...) + } + if pool.monitor != nil { pool.monitor.Event(&event.PoolEvent{ Type: event.PoolCreated, @@ -239,6 +293,10 @@ func (p *pool) ready() error { p.state = poolReady p.stateMu.Unlock() + if mustLogPoolMessage(p) { + logPoolMessage(p, logger.ConnectionPoolReady) + } + // Send event.PoolReady before resuming the maintain() goroutine to guarantee that the // "pool ready" event is always sent before maintain() starts creating connections. if p.monitor != nil { @@ -344,10 +402,17 @@ func (p *pool) close(ctx context.Context) { // Now that we're not holding any locks, remove all of the connections we collected from the // pool. for _, conn := range conns { - _ = p.removeConnection(conn, event.ReasonPoolClosed) + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) _ = p.closeConnection(conn) // We don't care about errors while closing the connection. } + if mustLogPoolMessage(p) { + logPoolMessage(p, logger.ConnectionPoolClosed) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.PoolClosedEvent, @@ -379,6 +444,10 @@ func (p *pool) unpinConnectionFromTransaction() { // ready, checkOut returns an error. // Based partially on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1324 func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { + if mustLogPoolMessage(p) { + logPoolMessage(p, logger.ConnectionCheckoutStarted) + } + // TODO(CSOT): If a Timeout was specified at any level, respect the Timeout is server selection, connection // TODO checkout. if p.monitor != nil { @@ -397,6 +466,15 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { switch p.state { case poolClosed: p.stateMu.RUnlock() + + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed, + } + + logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetFailed, @@ -408,6 +486,15 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { case poolPaused: err := poolClearedError{err: p.lastClearErr, address: p.address} p.stateMu.RUnlock() + + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyReason, logger.ReasonConnCheckoutFailedError, + } + + logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetFailed, @@ -442,6 +529,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { p.stateMu.RUnlock() if w.err != nil { + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyReason, logger.ReasonConnCheckoutFailedError, + } + + logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetFailed, @@ -452,6 +547,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { return nil, w.err } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, w.conn.poolID, + } + + logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetSucceeded, @@ -459,6 +562,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { ConnectionID: w.conn.poolID, }) } + return w.conn, nil } @@ -471,6 +575,15 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { select { case <-w.ready: if w.err != nil { + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyReason, logger.ReasonConnCheckoutFailedError, + logger.KeyError, w.err.Error(), + } + + logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetFailed, @@ -478,9 +591,18 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Reason: event.ReasonConnectionErrored, }) } + return nil, w.err } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, w.conn.poolID, + } + + logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetSucceeded, @@ -490,6 +612,14 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { } return w.conn, nil case <-ctx.Done(): + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyReason, logger.ReasonConnCheckoutFailedTimout, + } + + logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.GetFailed, @@ -497,6 +627,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { Reason: event.ReasonTimedOut, }) } + return nil, WaitQueueTimeoutError{ Wrapped: ctx.Err(), PinnedCursorConnections: atomic.LoadUint64(&p.pinnedCursorConnections), @@ -531,7 +662,7 @@ func (p *pool) getGenerationForNewConnection(serviceID *primitive.ObjectID) uint } // removeConnection removes a connection from the pool and emits a "ConnectionClosed" event. -func (p *pool) removeConnection(conn *connection, reason string) error { +func (p *pool) removeConnection(conn *connection, reason reason, err error) error { if conn == nil { return nil } @@ -561,12 +692,26 @@ func (p *pool) removeConnection(conn *connection, reason string) error { p.generation.removeConnection(conn.desc.ServiceID) } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.poolID, + logger.KeyReason, reason.loggerConn, + } + + if err != nil { + keysAndValues.Add(logger.KeyError, err.Error()) + } + + logPoolMessage(p, logger.ConnectionClosed, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionClosed, Address: p.address.String(), ConnectionID: conn.poolID, - Reason: reason, + Reason: reason.event, + Error: err, }) } @@ -583,6 +728,14 @@ func (p *pool) checkIn(conn *connection) error { return ErrWrongPool } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.poolID, + } + + logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionReturned, @@ -613,7 +766,7 @@ func (p *pool) checkInNoEvent(conn *connection) error { conn.bumpIdleDeadline() if reason, perished := connectionPerished(conn); perished { - _ = p.removeConnection(conn, reason) + _ = p.removeConnection(conn, reason, nil) go func() { _ = p.closeConnection(conn) }() @@ -621,7 +774,11 @@ func (p *pool) checkInNoEvent(conn *connection) error { } if conn.pool.getState() == poolClosed { - _ = p.removeConnection(conn, event.ReasonPoolClosed) + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedPoolClosed, + event: event.ReasonPoolClosed, + }, nil) + go func() { _ = p.closeConnection(conn) }() @@ -708,6 +865,14 @@ func (p *pool) clear(err error, serviceID *primitive.ObjectID) { p.createConnectionsCond.L.Unlock() } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyServiceID, serviceID, + } + + logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...) + } + if sendEvent && p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.PoolCleared, @@ -735,7 +900,7 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool { } if reason, perished := connectionPerished(conn); perished { - _ = conn.pool.removeConnection(conn, reason) + _ = conn.pool.removeConnection(conn, reason, nil) go func() { _ = conn.pool.closeConnection(conn) }() @@ -831,6 +996,14 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { continue } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.poolID, + } + + logPoolMessage(p, logger.ConnectionCreated, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionCreated, @@ -855,11 +1028,24 @@ func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID) } - _ = p.removeConnection(conn, event.ReasonError) + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedError, + event: event.ReasonError, + }, err) + _ = p.closeConnection(conn) + continue } + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.poolID, + } + + logPoolMessage(p, logger.ConnectionReady, keysAndValues...) + } + if p.monitor != nil { p.monitor.Event(&event.PoolEvent{ Type: event.ConnectionReady, @@ -978,7 +1164,7 @@ func (p *pool) removePerishedConns() { if reason, perished := connectionPerished(conn); perished { p.idleConns[i] = nil - _ = p.removeConnection(conn, reason) + _ = p.removeConnection(conn, reason, nil) go func() { _ = p.closeConnection(conn) }() @@ -1045,7 +1231,9 @@ func (w *wantConn) tryDeliver(conn *connection, err error) bool { if w.conn == nil && w.err == nil { panic("x/mongo/driver/topology: internal error: misuse of tryDeliver") } + close(w.ready) + return true } diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index d416f6c195..528cfa8e8a 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -176,6 +176,7 @@ func NewServer(addr address.Address, topologyID primitive.ObjectID, opts ...Serv MaxIdleTime: cfg.poolMaxIdleTime, MaintainInterval: cfg.poolMaintainInterval, PoolMonitor: cfg.poolMonitor, + Logger: cfg.logger, handshakeErrFn: s.ProcessHandshakeError, } diff --git a/x/mongo/driver/topology/server_options.go b/x/mongo/driver/topology/server_options.go index 73819f9fc3..f6126a9edc 100644 --- a/x/mongo/driver/topology/server_options.go +++ b/x/mongo/driver/topology/server_options.go @@ -12,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsoncodec" "go.mongodb.org/mongo-driver/event" + "go.mongodb.org/mongo-driver/internal/logger" "go.mongodb.org/mongo-driver/x/mongo/driver" "go.mongodb.org/mongo-driver/x/mongo/driver/session" ) @@ -36,6 +37,7 @@ type serverConfig struct { minConns uint64 maxConnecting uint64 poolMonitor *event.PoolMonitor + logger *logger.Logger poolMaxIdleTime time.Duration poolMaintainInterval time.Duration } @@ -193,3 +195,10 @@ func WithServerLoadBalanced(fn func(bool) bool) ServerOption { cfg.loadBalanced = fn(cfg.loadBalanced) } } + +// WithLogger configures the logger for the server to use. +func WithLogger(fn func() *logger.Logger) ServerOption { + return func(cfg *serverConfig) { + cfg.logger = fn() + } +} diff --git a/x/mongo/driver/topology/topology_options.go b/x/mongo/driver/topology/topology_options.go index 98b71ea383..bfc35489ff 100644 --- a/x/mongo/driver/topology/topology_options.go +++ b/x/mongo/driver/topology/topology_options.go @@ -8,11 +8,13 @@ package topology import ( "crypto/tls" + "fmt" "net/http" "strings" "time" "go.mongodb.org/mongo-driver/event" + "go.mongodb.org/mongo-driver/internal/logger" "go.mongodb.org/mongo-driver/mongo/description" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/mongo/driver" @@ -62,7 +64,7 @@ func NewConfig(co *options.ClientOptions, clock *session.ClusterClock) (*Config, var connOpts []ConnectionOption var serverOpts []ServerOption - cfgp := new(Config) + cfgp := &Config{} // Set the default "ServerSelectionTimeout" to 30 seconds. cfgp.ServerSelectionTimeout = defaultServerSelectionTimeout @@ -224,7 +226,7 @@ func NewConfig(co *options.ClientOptions, clock *session.ClusterClock) (*Config, // MaxConIdleTime if co.MaxConnIdleTime != nil { - connOpts = append(connOpts, WithIdleTimeout( + serverOpts = append(serverOpts, WithConnectionPoolMaxIdleTime( func(time.Duration) time.Duration { return *co.MaxConnIdleTime }, )) } @@ -333,6 +335,24 @@ func NewConfig(co *options.ClientOptions, clock *session.ClusterClock) (*Config, ) } + if opts := co.LoggerOptions; opts != nil { + // Build an internal component-level mapping. + componentLevels := make(map[logger.Component]logger.Level) + for component, level := range opts.ComponentLevels { + componentLevels[logger.Component(component)] = logger.Level(level) + } + + log, err := logger.New(opts.Sink, opts.MaxDocumentLength, componentLevels) + if err != nil { + return nil, fmt.Errorf("error creating logger: %w", err) + } + + serverOpts = append( + serverOpts, + WithLogger(func() *logger.Logger { return log }), + ) + } + serverOpts = append( serverOpts, WithClock(func(*session.ClusterClock) *session.ClusterClock { return clock }),