From e059c79fb5257166b8bf47c700790c07131d3970 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Mon, 27 Jul 2020 15:13:46 -0400 Subject: [PATCH 1/8] Revert "Update AbstractMongoBackend" This reverts commit 9e379e5c2bebf409f85555fb2af2e1a547241215. --- .../mongo/backend/AbstractMongoBackend.java | 84 +++--------------- .../backend/AbstractMongoBackendTest.java | 87 +------------------ 2 files changed, 17 insertions(+), 154 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java index 2be6192c8..d8bd654ab 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java @@ -13,7 +13,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -33,7 +32,6 @@ import de.bwaldvogel.mongo.oplog.CollectionBackedOplog; import de.bwaldvogel.mongo.oplog.NoopOplog; import de.bwaldvogel.mongo.oplog.Oplog; -import de.bwaldvogel.mongo.util.FutureUtils; import de.bwaldvogel.mongo.wire.BsonConstants; import de.bwaldvogel.mongo.wire.MongoWireProtocolHandler; import de.bwaldvogel.mongo.wire.message.Message; @@ -52,7 +50,7 @@ public abstract class AbstractMongoBackend implements MongoBackend { protected static final String OPLOG_COLLECTION_NAME = "oplog.rs"; - static final String ADMIN_DB_NAME = "admin"; + private static final String ADMIN_DB_NAME = "admin"; private final Map databases = new ConcurrentHashMap<>(); @@ -298,8 +296,8 @@ private MongoCollection resolveCollection(final String namespace) { protected abstract MongoDatabase openOrCreateDatabase(String databaseName); - // handle command synchronously - private Document handleCommandSync(Channel channel, String databaseName, String command, Document query) { + @Override + public Document handleCommand(Channel channel, String databaseName, String command, Document query) { if (command.equalsIgnoreCase("whatsmyuri")) { Document response = new Document(); InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); @@ -329,51 +327,13 @@ private Document handleCommandSync(Channel channel, String databaseName, String } else if (command.equalsIgnoreCase("killCursors")) { return handleKillCursors(query); } - return null; - } - - @Override - public Document handleCommand(Channel channel, String databaseName, String command, Document query) { - Document commandSync = handleCommandSync(channel, databaseName, command, query); - if (commandSync != null) { - return commandSync; - } if (databaseName.equals(ADMIN_DB_NAME)) { return handleAdminCommand(command, query); + } else { + MongoDatabase db = resolveDatabase(databaseName); + return db.handleCommand(channel, command, query, oplog); } - - return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog); - } - - @Override - public CompletionStage handleCommandAsync(Channel channel, String database, - String command, Document query) { - if ("dropDatabase".equalsIgnoreCase(command)) { - return dropDatabaseAsync(database) - .handle((aVoid, ex) -> { - Document response = new Document("dropped", database); - if (ex != null) { - response.put("errmsg", ex.getMessage()); - response.put("ok", 0.0); - log.error("dropDatabase " + database + " error!", ex); - } else { - Utils.markOkay(response); - } - return response; - }); - } - - Document commandSync = handleCommandSync(channel, database, command, query); - if (commandSync != null) { - return FutureUtils.wrap(() -> commandSync); - } - - if (database.equals(ADMIN_DB_NAME)) { - return FutureUtils.wrap(() -> handleAdminCommand(command, query)); - } - - return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog); } @Override @@ -384,12 +344,8 @@ public Collection getCurrentOperations(MongoQuery query) { @Override public QueryResult handleQuery(MongoQuery query) { - return resolveDatabase(query).handleQuery(query); - } - - @Override - public CompletionStage handleQueryAsync(MongoQuery query) { - return resolveDatabase(query).handleQueryAsync(query); + MongoDatabase db = resolveDatabase(query); + return db.handleQuery(query); } @Override @@ -410,32 +366,20 @@ public QueryResult handleGetMore(MongoGetMore getMore) { @Override public void handleInsert(MongoInsert insert) { - resolveDatabase(insert).handleInsert(insert, oplog); - } - - @Override - public CompletionStage handleInsertAsync(MongoInsert insert) { - return resolveDatabase(insert).handleInsertAsync(insert, oplog); + MongoDatabase db = resolveDatabase(insert); + db.handleInsert(insert, oplog); } @Override public void handleDelete(MongoDelete delete) { - resolveDatabase(delete).handleDelete(delete, oplog); - } - - @Override - public CompletionStage handleDeleteAsync(MongoDelete delete) { - return resolveDatabase(delete).handleDeleteAsync(delete, oplog); + MongoDatabase db = resolveDatabase(delete); + db.handleDelete(delete, oplog); } @Override public void handleUpdate(MongoUpdate update) { - resolveDatabase(update).handleUpdate(update, oplog); - } - - @Override - public CompletionStage handleUpdateAsync(MongoUpdate update) { - return resolveDatabase(update).handleUpdateAsync(update, oplog); + MongoDatabase db = resolveDatabase(update); + db.handleUpdate(update, oplog); } @Override diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java index d2f207712..e87e11f48 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java @@ -1,10 +1,11 @@ package de.bwaldvogel.mongo.backend; -import java.net.InetSocketAddress; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletionStage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -15,19 +16,10 @@ import de.bwaldvogel.mongo.exception.CursorNotFoundException; import de.bwaldvogel.mongo.wire.message.MongoGetMore; import de.bwaldvogel.mongo.wire.message.MongoKillCursors; -import io.netty.channel.Channel; -import org.mockito.Mockito; - -import static de.bwaldvogel.mongo.backend.AbstractMongoBackend.ADMIN_DB_NAME; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; class AbstractMongoBackendTest { private MongoBackend backend; - private MongoBackend backendWithError; private CursorRegistry cursorRegistry; @BeforeEach @@ -38,31 +30,10 @@ public void setup() { AbstractMongoBackendTest.this.cursorRegistry = getCursorRegistry(); } - @Override - protected MongoDatabase openOrCreateDatabase(String databaseName) { - MongoDatabase mockDatabase = Mockito.mock(AbstractMongoDatabase.class); - - Document fakeResponse = new Document(); - Utils.markOkay(fakeResponse); - fakeResponse.put("message", "fakeResponse"); - - when(mockDatabase.handleCommand(any(), any(), any(), any())).thenReturn(fakeResponse); - - return mockDatabase; - } - }; - - backendWithError = new AbstractMongoBackend() { - @Override protected MongoDatabase openOrCreateDatabase(String databaseName) { return null; } - - @Override - public void dropDatabase(String database) { - throw new RuntimeException("unexpected"); - } }; } @@ -102,56 +73,4 @@ void testHandleKillCursor() { assertThat(cursorRegistry.getCursor(cursor2.getId())).isNotNull(); } - @Test - void testHandleCommand() { - Channel channel = Mockito.mock(Channel.class); - when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.1.254", 27017)); - - Document response = backend.handleCommand(channel, null, "whatsmyuri", null); - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - assertThat(response.get("you")).isEqualTo("127.0.1.254:27017"); - } - - @Test - void testHandleAdminCommand() { - Channel channel = Mockito.mock(Channel.class); - - Document response = backend.handleCommand(channel, ADMIN_DB_NAME, "ping", null); - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - } - - @Test - void testMongoDatabaseHandleCommand() { - Channel channel = Mockito.mock(Channel.class); - - Document response = backend.handleCommand(channel, "mockDatabase", "find", null); - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - assertThat(response.get("message")).isEqualTo("fakeResponse"); - } - - @Test - void testHandleCommandAsyncDropDatabase() throws Exception { - Channel channel = Mockito.mock(Channel.class); - - CompletionStage responseFuture = backend.handleCommandAsync(channel, "mockDatabase", "dropDatabase", null); - Document response = responseFuture.toCompletableFuture().get(); - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - assertThat(response.get("dropped")).isEqualTo("mockDatabase"); - } - - @Test - void testHandleCommandAsyncDropDatabaseError() throws Exception { - Channel channel = Mockito.mock(Channel.class); - - CompletionStage responseFuture = backendWithError.handleCommandAsync(channel, "mockDatabase", "dropDatabase", null); - Document response = responseFuture.toCompletableFuture().get(); - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(0.0); - assertThat(response.get("dropped")).isEqualTo("mockDatabase"); - assertThat(response.get("errmsg")).isEqualTo("unexpected"); - } } From 44e02f3e47d030f45ab80886af564638a49dba73 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Mon, 27 Jul 2020 15:22:54 -0400 Subject: [PATCH 2/8] Revert "Update AbstractMongoDatabase" This reverts commit 8481016960431c9741ce7d4740148c1ad5b11fd2. --- .../mongo/backend/AbstractMongoDatabase.java | 76 ++----------- .../backend/AbstractMongoDatabaseTest.java | 101 ------------------ 2 files changed, 7 insertions(+), 170 deletions(-) delete mode 100644 core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java index 492a42b28..9309f8ddd 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java @@ -12,7 +12,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -32,7 +31,6 @@ import de.bwaldvogel.mongo.exception.NoSuchCommandException; import de.bwaldvogel.mongo.oplog.NoopOplog; import de.bwaldvogel.mongo.oplog.Oplog; -import de.bwaldvogel.mongo.util.FutureUtils; import de.bwaldvogel.mongo.wire.message.MongoDelete; import de.bwaldvogel.mongo.wire.message.MongoInsert; import de.bwaldvogel.mongo.wire.message.MongoQuery; @@ -96,18 +94,17 @@ public String toString() { return getClass().getSimpleName() + "(" + getDatabaseName() + ")"; } - private Document commandError(Channel channel, String command, Document query) { + @Override + public Document handleCommand(Channel channel, String command, Document query, Oplog oplog) { // getlasterror must not clear the last error if (command.equalsIgnoreCase("getlasterror")) { return commandGetLastError(channel, command, query); } else if (command.equalsIgnoreCase("reseterror")) { return commandResetError(channel); } - return null; - } - // handle command synchronously - private Document handleCommandSync(Channel channel, String command, Document query, Oplog oplog) { + clearLastStatus(channel); + if (command.equalsIgnoreCase("find")) { return commandFind(command, query); } else if (command.equalsIgnoreCase("insert")) { @@ -166,34 +163,6 @@ private Document handleCommandSync(Channel channel, String command, Document que throw new NoSuchCommandException(command); } - @Override - public Document handleCommand(Channel channel, String command, Document query, Oplog oplog) { - Document commandErrorDocument = commandError(channel, command, query); - if (commandErrorDocument != null) { - return commandErrorDocument; - } - - clearLastStatus(channel); - - return handleCommandSync(channel, command, query, oplog); - } - - @Override - public CompletionStage handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) { - Document commandErrorDocument = commandError(channel, command, query); - if (commandErrorDocument != null) { - return FutureUtils.wrap(() -> commandErrorDocument); - } - - clearLastStatus(channel); - - if ("find".equalsIgnoreCase(command)) { - return commandFindAsync(command, query); - } - - return FutureUtils.wrap(() -> handleCommandSync(channel, command, query, oplog)); - } - private Document listCollections() { List firstBatch = new ArrayList<>(); for (String namespace : listCollectionNamespaces()) { @@ -207,7 +176,8 @@ private Document listCollections() { collectionDescription.put("options", collectionOptions); collectionDescription.put("info", new Document("readOnly", false)); collectionDescription.put("type", "collection"); - collectionDescription.put("idIndex", getPrimaryKeyIndexDescription(namespace)); + collectionDescription.put("idIndex", getPrimaryKeyIndexDescription(namespace) + ); firstBatch.add(collectionDescription); } @@ -254,18 +224,6 @@ private Document commandFind(String command, Document query) { return toCursorResponse(collection, queryResult); } - private CompletionStage commandFindAsync(String command, Document query) { - String collectionName = (String) query.get(command); - MongoCollection

collection = resolveCollection(collectionName, false); - if (collection == null) { - return FutureUtils.wrap(() -> Utils.firstBatchCursorResponse(getFullCollectionNamespace(collectionName), - Collections.emptyList())); - } - QueryParameters queryParameters = toQueryParameters(query); - return collection.handleQueryAsync(queryParameters) - .thenApply(queryResult -> toCursorResponse(collection, queryResult)); - } - private static QueryParameters toQueryParameters(Document query) { int numberToSkip = ((Number) query.getOrDefault("skip", 0)).intValue(); int numberToReturn = ((Number) query.getOrDefault("limit", 0)).intValue(); @@ -648,26 +606,6 @@ public QueryResult handleQuery(MongoQuery query) { return collection.handleQuery(queryParameters); } - @Override - public CompletionStage handleQueryAsync(MongoQuery query) { - clearLastStatus(query.getChannel()); - String collectionName = query.getCollectionName(); - MongoCollection

collection = resolveCollection(collectionName, false); - if (collection == null) { - return FutureUtils.wrap(QueryResult::new); - } - int numberToSkip = query.getNumberToSkip(); - int batchSize = query.getNumberToReturn(); - - if (batchSize < -1) { - // actually: request to close cursor automatically - batchSize = -batchSize; - } - - QueryParameters queryData = toQueryParameters(query, numberToSkip, batchSize); - return collection.handleQueryAsync(queryData); - } - @Override public void handleClose(Channel channel) { lastResults.remove(channel); @@ -1031,7 +969,7 @@ public void moveCollection(MongoDatabase oldDatabase, MongoCollection collect new Document("$set", new Document("ns", newCollection.getFullName())), ArrayFilters.empty(), true, false, NoopOplog.get()); - namespaces.insertDocuments(newDocuments, true); + namespaces.insertDocuments(newDocuments); } protected String getFullCollectionNamespace(String collectionName) { diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java deleted file mode 100644 index 3cfc6537a..000000000 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabaseTest.java +++ /dev/null @@ -1,101 +0,0 @@ -package de.bwaldvogel.mongo.backend; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletionStage; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; - -import de.bwaldvogel.mongo.MongoCollection; -import de.bwaldvogel.mongo.bson.Document; -import io.netty.channel.Channel; - -class AbstractMongoDatabaseTest { - - private AbstractMongoDatabase database; - private CursorRegistry cursorRegistry; - - @BeforeEach - public void setup() { - cursorRegistry = Mockito.mock(CursorRegistry.class); - - database = new AbstractMongoDatabase("testdb", cursorRegistry) { - - @Override - protected long getFileSize() { - return 0; - } - - @Override - protected long getStorageSize() { - return 0; - } - - @Override - protected Index openOrCreateUniqueIndex(String collectionName, String indexName, List keys, boolean sparse) { - return null; - } - - @Override - protected MongoCollection openOrCreateCollection(String collectionName, CollectionOptions options) { - MongoCollection collection = (MongoCollection) Mockito.mock(MongoCollection.class); - when(collection.getCollectionName()).thenReturn("mockCollection"); - - QueryResult queryResult = new QueryResult(Collections.singletonList(new Document("_id", 1))); - - when(collection.handleQuery(any(QueryParameters.class))).thenReturn(queryResult); - - when(collection.handleQueryAsync(any())).thenCallRealMethod(); - - return collection; - } - }; - - database.initializeNamespacesAndIndexes(); - } - - @Test - void testHandleCommandAsyncFindReturnEmpty() throws Exception { - Channel channel = Mockito.mock(Channel.class); - - Document query = new Document(); - query.put("find", "testCollection"); - - CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null); - Document response = responseFuture.toCompletableFuture().get(); - - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - } - - @Test - void testHandleCommandAsyncFindReturnSomething() throws Exception { - Channel channel = Mockito.mock(Channel.class); - - Document query = new Document(); - query.put("find", "mockCollection"); - - CompletionStage responseFuture = database.handleCommandAsync(channel, "find", query, null); - Document response = responseFuture.toCompletableFuture().get(); - - assertThat(response).isNotNull(); - assertThat(response.get("ok")).isEqualTo(1.0); - - Document cursor = (Document) response.get("cursor"); - assertThat(cursor).isNotNull(); - - List firstBatch = (List) cursor.get("firstBatch"); - assertThat(firstBatch).isNotNull(); - assertThat(firstBatch).hasSize(1); - - Document doc = firstBatch.get(0); - assertThat(doc).isNotNull(); - assertThat(doc.get("_id")).isEqualTo(1); - } -} From 27e8bce5e1de2063f0231719615b1eff2be762c4 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Thu, 25 Jun 2020 00:33:29 -0400 Subject: [PATCH 3/8] Update MongoDatabaseHandler log unknown error unwrap CompletionException enhance error handling --- .../mongo/wire/MongoDatabaseHandler.java | 138 +++++++++++------- .../mongo/wire/MongoDatabaseHandlerTest.java | 134 ++++++++++------- 2 files changed, 172 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandler.java b/core/src/main/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandler.java index e65d04774..6754fbbf5 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandler.java +++ b/core/src/main/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandler.java @@ -2,8 +2,10 @@ import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +35,7 @@ public class MongoDatabaseHandler extends SimpleChannelInboundHandler { - private static final Logger log = LoggerFactory.getLogger(MongoWireProtocolHandler.class); + private static final Logger log = LoggerFactory.getLogger(MongoDatabaseHandler.class); private final AtomicInteger idSequence = new AtomicInteger(); private final MongoBackend mongoBackend; @@ -93,63 +95,91 @@ protected void channelRead0(ChannelHandlerContext ctx, ClientRequest object) { // visible for testing CompletionStage handleMessageAsync(MongoMessage message) { - return mongoBackend.handleMessageAsync(message) - .handle((document, ex) -> createResponseMongoMessage(message, document, ex)); + CompletionStage handleMessageAsyncResponse; + try { + handleMessageAsyncResponse = mongoBackend.handleMessageAsync(message); + } catch (Throwable t) { + handleMessageAsyncResponse = FutureUtils.failedFuture(t); + } + return handleMessageAsyncResponse.handle(handleMessageAsyncBiFunc(message)); } - private MongoMessage createResponseMongoMessage(MongoMessage message, Document document, Throwable ex) { - if (ex != null) { - MongoServerException e; - if (ex instanceof MongoServerException) { - e = (MongoServerException) ex; - if (e.isLogError()) { - log.error("failed to handle {}", message.getDocument(), e); + private BiFunction handleMessageAsyncBiFunc(MongoMessage message) { + return (document, t) -> { + if (t != null) { + if (t instanceof CompletionException) { + t = t.getCause(); + } + + MongoServerException e; + if (t instanceof MongoServerException) { + e = (MongoServerException) t; + if (e.isLogError()) { + log.error("failed to handle {}", message.getDocument(), e); + } + } else { + log.error("Unknown error!", t); + e = new MongoServerException("Unknown error: " + t.getMessage(), t); } - } else { - log.error("Unknown error!", ex); - e = new MongoServerException("Unknown error: " + ex.getMessage(), ex); + + document = errorResponse(e, Collections.emptyMap()); } - document = errorResponse(e, Collections.emptyMap()); - } - return new MongoMessage(message.getChannel(), createResponseHeader(message), document); + return new MongoMessage(message.getChannel(), createResponseHeader(message), document); + }; } // visible for testing CompletionStage handleQueryAsync(MongoQuery query) { if (query.getCollectionName().startsWith("$cmd")) { - return handleCommandAsync(query) - .handle((document, ex) -> - createResponseMongoReplyForCommand(query, document, ex)); + CompletionStage handleCommandAsyncResponse; + try { + handleCommandAsyncResponse = handleCommandAsync(query); + } catch (Throwable t) { + handleCommandAsyncResponse = FutureUtils.failedFuture(t); + } + return handleCommandAsyncResponse.handle(handleCommandAsyncBiFunc(query)); } - return mongoBackend.handleQueryAsync(query) - .handle((queryResult, ex) -> - createResponseMongoReplyForQuery(query, queryResult, ex)); + CompletionStage handleQueryResponseFut; + try { + handleQueryResponseFut = mongoBackend.handleQueryAsync(query); + } catch (Throwable t) { + handleQueryResponseFut = FutureUtils.failedFuture(t); + } + return handleQueryResponseFut.handle(handleQueryAsyncBiFunc(query)); } - private MongoReply createResponseMongoReplyForCommand(MongoQuery query, Document document, Throwable t) { - MessageHeader header = createResponseHeader(query); - if (t != null) { - return createResponseMongoReplyForQueryFailure(header, query, t); - } + private BiFunction handleCommandAsyncBiFunc(MongoQuery query) { + return (document, t) -> { + MessageHeader header = createResponseHeader(query); + if (t != null) { + return createResponseMongoReplyForQueryFailure(header, query, t); + } - return new MongoReply(header, - document != null ? Collections.singletonList(document) : Collections.emptyList(), - 0); + return new MongoReply(header, + document != null ? Collections.singletonList(document) : Collections.emptyList(), + 0); + }; } - private MongoReply createResponseMongoReplyForQuery(MongoQuery query, QueryResult queryResult, Throwable t) { - MessageHeader header = createResponseHeader(query); - if (t != null) { - return createResponseMongoReplyForQueryFailure(header, query, t); - } + private BiFunction handleQueryAsyncBiFunc(MongoQuery query) { + return (queryResult, t) -> { + MessageHeader header = createResponseHeader(query); + if (t != null) { + return createResponseMongoReplyForQueryFailure(header, query, t); + } - return new MongoReply(header, - queryResult != null ? queryResult.collectDocuments() : Collections.emptyList(), - queryResult != null ? queryResult.getCursorId() : 0); + return new MongoReply(header, + queryResult != null ? queryResult.collectDocuments() : Collections.emptyList(), + queryResult != null ? queryResult.getCursorId() : 0); + }; } private MongoReply createResponseMongoReplyForQueryFailure(MessageHeader header, MongoQuery query, Throwable t) { + if (t instanceof CompletionException) { + t = t.getCause(); + } + if (t instanceof NoSuchCommandException) { log.error("unknown command: {}", query, t); Map additionalInfo = Collections.singletonMap("bad cmd", query.getQuery()); @@ -171,23 +201,33 @@ private MongoReply createResponseMongoReplyForQueryFailure(MessageHeader header, // visible for testing CompletionStage handleGetMoreAsync(MongoGetMore getMore) { - return mongoBackend.handleGetMoreAsync(getMore) - .handle((queryResult, ex) -> - createResponseMongoReplyForGetMore(getMore, queryResult, ex)); + CompletionStage handleGetMoreAsyncResponse; + try { + handleGetMoreAsyncResponse = mongoBackend.handleGetMoreAsync(getMore); + } catch (Throwable t) { + handleGetMoreAsyncResponse = FutureUtils.failedFuture(t); + } + return handleGetMoreAsyncResponse.handle(handleGetMoreAsyncBiFunc(getMore)); } - private MongoReply createResponseMongoReplyForGetMore(MongoGetMore getMore, QueryResult queryResult, Throwable t) { - MessageHeader header = createResponseHeader(getMore); - if (t != null) { - return createResponseMongoReplyForGetMoreFailure(header, getMore, t); - } + private BiFunction handleGetMoreAsyncBiFunc(MongoGetMore getMore) { + return (queryResult, t) -> { + MessageHeader header = createResponseHeader(getMore); + if (t != null) { + return createResponseMongoReplyForGetMoreFailure(header, getMore, t); + } - return new MongoReply(header, - queryResult != null ? queryResult.collectDocuments() : Collections.emptyList(), - queryResult != null ? queryResult.getCursorId() : 0); + return new MongoReply(header, + queryResult != null ? queryResult.collectDocuments() : Collections.emptyList(), + queryResult != null ? queryResult.getCursorId() : 0); + }; } private MongoReply createResponseMongoReplyForGetMoreFailure(MessageHeader header, MongoGetMore getMore, Throwable t) { + if (t instanceof CompletionException) { + t = t.getCause(); + } + if (t instanceof CursorNotFoundException) { return new MongoReply(header, Collections.emptyList(), diff --git a/core/src/test/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandlerTest.java b/core/src/test/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandlerTest.java index 2d5ef1e7a..f07411176 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandlerTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/wire/MongoDatabaseHandlerTest.java @@ -1,7 +1,15 @@ package de.bwaldvogel.mongo.wire; +import static de.bwaldvogel.mongo.TestUtils.json; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.List; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import de.bwaldvogel.mongo.MongoBackend; @@ -13,20 +21,19 @@ import de.bwaldvogel.mongo.wire.message.MongoReply; import io.netty.channel.Channel; -import static de.bwaldvogel.mongo.TestUtils.json; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - public class MongoDatabaseHandlerTest { + private MongoBackend backend; + private Channel channel; + + @BeforeEach + public void setup() { + backend = mock(MongoBackend.class); + channel = mock(Channel.class); + } + @Test void testWrappedCommand() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); - final Channel channel = mock(Channel.class); - final Document queryDoc = json("'$query': { 'count': 'collectionName' }, '$readPreference': { 'mode': 'secondaryPreferred' }"); final Document subQueryDoc = json("'count': 'collectionName'"); final MongoQuery query = new MongoQuery(channel, null, "dbName.$cmd", 0, 0, queryDoc, null); @@ -40,9 +47,6 @@ void testWrappedCommand() throws Exception { @Test void testNonWrappedCommand() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); - final Channel channel = mock(Channel.class); - final Document queryDoc = json("'count': 'collectionName'"); final MongoQuery query = new MongoQuery(channel, null, "dbName.$cmd", 0, 0, queryDoc, null); @@ -55,11 +59,8 @@ void testNonWrappedCommand() throws Exception { @Test void testHandleQueryUnknownCollection() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); when(backend.handleQueryAsync(any())).thenCallRealMethod(); - final Channel channel = mock(Channel.class); - final MessageHeader header = new MessageHeader(0, 0); final MongoQuery query = new MongoQuery(channel, header, "dbName.$cmd.unknown", 0, 0, null, null); @@ -79,12 +80,9 @@ void testHandleQueryUnknownCollection() throws Exception { @Test void testHandleQueryCommandUnknownError() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); when(backend.handleCommandAsync(any(), any(), any(), any())).thenCallRealMethod(); when(backend.handleCommand(any(), any(), any(), any())).thenThrow(new RuntimeException("unexpected")); - final Channel channel = mock(Channel.class); - final MessageHeader header = new MessageHeader(0, 0); final Document queryDoc = json("'$query': { 'count': 'collectionName' }, '$readPreference': { 'mode': 'secondaryPreferred' }"); final MongoQuery query = new MongoQuery(channel, header, "dbName.$cmd", 0, 0, queryDoc, null); @@ -92,82 +90,107 @@ void testHandleQueryCommandUnknownError() throws Exception { final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); MongoReply responseMongoReply = handler.handleQueryAsync(query).toCompletableFuture().get(); + assertMongoReplyUnknownError(responseMongoReply); + } - assertThat(responseMongoReply).isNotNull(); - List documents = responseMongoReply.getDocuments(); - assertThat(documents).isNotEmpty(); - Document doc = documents.get(0); - assertThat(doc).isNotNull(); - assertThat(doc.get("$err")).isEqualTo("Unknown error: unexpected"); - assertThat(doc.get("errmsg")).isEqualTo("Unknown error: unexpected"); - assertThat(doc.get("ok")).isEqualTo(0); + @Test + void testHandleQueryCommandAsyncUnknownError() throws Exception { + when(backend.handleCommandAsync(any(), any(), any(), any())).thenThrow(new RuntimeException("unexpected")); + + final MessageHeader header = new MessageHeader(0, 0); + final Document queryDoc = json("'$query': { 'count': 'collectionName' }, '$readPreference': { 'mode': 'secondaryPreferred' }"); + final MongoQuery query = new MongoQuery(channel, header, "dbName.$cmd", 0, 0, queryDoc, null); + + final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); + + MongoReply responseMongoReply = handler.handleQueryAsync(query).toCompletableFuture().get(); + assertMongoReplyUnknownError(responseMongoReply); } @Test void testHandleQueryUnknownError() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); when(backend.handleQueryAsync(any())).thenCallRealMethod(); when(backend.handleQuery(any())).thenThrow(new RuntimeException("unexpected")); - final Channel channel = mock(Channel.class); - final MessageHeader header = new MessageHeader(0, 0); final MongoQuery query = new MongoQuery(channel, header, "dbName.find", 0, 0, null, null); final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); MongoReply responseMongoReply = handler.handleQueryAsync(query).toCompletableFuture().get(); + assertMongoReplyUnknownError(responseMongoReply); + } - assertThat(responseMongoReply).isNotNull(); - List documents = responseMongoReply.getDocuments(); - assertThat(documents).isNotEmpty(); - Document doc = documents.get(0); - assertThat(doc).isNotNull(); - assertThat(doc.get("$err")).isEqualTo("Unknown error: unexpected"); - assertThat(doc.get("errmsg")).isEqualTo("Unknown error: unexpected"); - assertThat(doc.get("ok")).isEqualTo(0); + @Test + void testHandleQueryAsyncUnknownError() throws Exception { + when(backend.handleQueryAsync(any())).thenThrow(new RuntimeException("unexpected")); + + final MessageHeader header = new MessageHeader(0, 0); + final MongoQuery query = new MongoQuery(channel, header, "dbName.find", 0, 0, null, null); + + final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); + + MongoReply responseMongoReply = handler.handleQueryAsync(query).toCompletableFuture().get(); + assertMongoReplyUnknownError(responseMongoReply); } @Test void testHandleMessageUnknownError() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); when(backend.handleMessageAsync(any())).thenCallRealMethod(); when(backend.handleMessage(any())).thenThrow(new RuntimeException("unexpected")); - final Channel channel = mock(Channel.class); - final MessageHeader header = new MessageHeader(0, 0); - final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); MongoMessage requestMessage = new MongoMessage(channel, header, new Document("key", "1")); MongoMessage responseMessage = handler.handleMessageAsync(requestMessage).toCompletableFuture().get(); + assertMongoMessageUnknownError(responseMessage); + } - assertThat(responseMessage).isNotNull(); - Document responseMessageDoc = responseMessage.getDocument(); - assertThat(responseMessageDoc).isNotNull(); - assertThat(responseMessageDoc.get("$err")).isEqualTo("Unknown error: unexpected"); - assertThat(responseMessageDoc.get("errmsg")).isEqualTo("Unknown error: unexpected"); - assertThat(responseMessageDoc.get("ok")).isEqualTo(0); + @Test + void testHandleMessageAsyncUnknownError() throws Exception { + when(backend.handleMessageAsync(any())).thenThrow(new RuntimeException("unexpected")); + + final MessageHeader header = new MessageHeader(0, 0); + final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); + + MongoMessage requestMessage = new MongoMessage(channel, header, new Document("key", "1")); + + MongoMessage responseMessage = handler.handleMessageAsync(requestMessage).toCompletableFuture().get(); + assertMongoMessageUnknownError(responseMessage); } @Test void testHandleGetMoreUnknownError() throws Exception { - final MongoBackend backend = mock(MongoBackend.class); when(backend.handleGetMoreAsync(any())).thenCallRealMethod(); when(backend.handleGetMore(any())).thenThrow(new RuntimeException("unexpected")); - final Channel channel = mock(Channel.class); - final MessageHeader header = new MessageHeader(0, 0); + final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); + + MongoGetMore requestGetMore = new MongoGetMore(channel, header, "collectionName", 5, 0); + + MongoReply responseMongoReply = handler.handleGetMoreAsync(requestGetMore).toCompletableFuture().get(); + + assertMongoReplyUnknownError(responseMongoReply); + } + + @Test + void testHandleGetMoreAsyncUnknownError() throws Exception { + when(backend.handleGetMoreAsync(any())).thenThrow(new RuntimeException("unexpected")); + final MessageHeader header = new MessageHeader(0, 0); final MongoDatabaseHandler handler = new MongoDatabaseHandler(backend, null); MongoGetMore requestGetMore = new MongoGetMore(channel, header, "collectionName", 5, 0); MongoReply responseMongoReply = handler.handleGetMoreAsync(requestGetMore).toCompletableFuture().get(); + assertMongoReplyUnknownError(responseMongoReply); + } + + private void assertMongoReplyUnknownError(MongoReply responseMongoReply) { assertThat(responseMongoReply).isNotNull(); List documents = responseMongoReply.getDocuments(); assertThat(documents).isNotEmpty(); @@ -177,4 +200,13 @@ void testHandleGetMoreUnknownError() throws Exception { assertThat(doc.get("errmsg")).isEqualTo("Unknown error: unexpected"); assertThat(doc.get("ok")).isEqualTo(0); } + + private void assertMongoMessageUnknownError(MongoMessage responseMessage) { + assertThat(responseMessage).isNotNull(); + Document responseMessageDoc = responseMessage.getDocument(); + assertThat(responseMessageDoc).isNotNull(); + assertThat(responseMessageDoc.get("$err")).isEqualTo("Unknown error: unexpected"); + assertThat(responseMessageDoc.get("errmsg")).isEqualTo("Unknown error: unexpected"); + assertThat(responseMessageDoc.get("ok")).isEqualTo(0); + } } From e8d9e1bc3037853985fd36a707e8298863ba1e2b Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Fri, 26 Jun 2020 22:16:41 -0400 Subject: [PATCH 4/8] Update AbstractMongoBackend Change method visibility to public for extensibility Add tests to cover synchronous and asynchronous method call --- .../mongo/backend/AbstractMongoBackend.java | 21 +++-- .../backend/AbstractMongoBackendTest.java | 79 ++++++++++++++++++- 2 files changed, 92 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java index d8bd654ab..f353cfdfa 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoBackend.java @@ -50,7 +50,7 @@ public abstract class AbstractMongoBackend implements MongoBackend { protected static final String OPLOG_COLLECTION_NAME = "oplog.rs"; - private static final String ADMIN_DB_NAME = "admin"; + protected static final String ADMIN_DB_NAME = "admin"; private final Map databases = new ConcurrentHashMap<>(); @@ -148,7 +148,7 @@ private Document getLog(String argument) { return response; } - private Document handleAdminCommand(String command, Document query) { + protected Document handleAdminCommand(String command, Document query) { if (command.equalsIgnoreCase("listdatabases")) { List databases = listDatabaseNames().stream() .sorted() @@ -296,8 +296,7 @@ private MongoCollection resolveCollection(final String namespace) { protected abstract MongoDatabase openOrCreateDatabase(String databaseName); - @Override - public Document handleCommand(Channel channel, String databaseName, String command, Document query) { + protected Document handleCommandSync(Channel channel, String databaseName, String command, Document query) { if (command.equalsIgnoreCase("whatsmyuri")) { Document response = new Document(); InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress(); @@ -327,13 +326,21 @@ public Document handleCommand(Channel channel, String databaseName, String comma } else if (command.equalsIgnoreCase("killCursors")) { return handleKillCursors(query); } + return null; + } + + @Override + public Document handleCommand(Channel channel, String databaseName, String command, Document query) { + Document commandResponse = handleCommandSync(channel, databaseName, command, query); + if (commandResponse != null) { + return commandResponse; + } if (databaseName.equals(ADMIN_DB_NAME)) { return handleAdminCommand(command, query); - } else { - MongoDatabase db = resolveDatabase(databaseName); - return db.handleCommand(channel, command, query, oplog); } + + return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog); } @Override diff --git a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java index e87e11f48..777657dd7 100644 --- a/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java +++ b/core/src/test/java/de/bwaldvogel/mongo/backend/AbstractMongoBackendTest.java @@ -1,14 +1,20 @@ package de.bwaldvogel.mongo.backend; +import static de.bwaldvogel.mongo.backend.AbstractMongoBackend.ADMIN_DB_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletionStage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import de.bwaldvogel.mongo.MongoBackend; import de.bwaldvogel.mongo.MongoDatabase; @@ -16,6 +22,7 @@ import de.bwaldvogel.mongo.exception.CursorNotFoundException; import de.bwaldvogel.mongo.wire.message.MongoGetMore; import de.bwaldvogel.mongo.wire.message.MongoKillCursors; +import io.netty.channel.Channel; class AbstractMongoBackendTest { @@ -32,7 +39,15 @@ public void setup() { @Override protected MongoDatabase openOrCreateDatabase(String databaseName) { - return null; + MongoDatabase mockDatabase = Mockito.mock(AbstractMongoDatabase.class); + + Document fakeResponse = new Document(); + Utils.markOkay(fakeResponse); + fakeResponse.put("message", "fakeResponse"); + + when(mockDatabase.handleCommand(any(), any(), any(), any())).thenReturn(fakeResponse); + + return mockDatabase; } }; } @@ -73,4 +88,66 @@ void testHandleKillCursor() { assertThat(cursorRegistry.getCursor(cursor2.getId())).isNotNull(); } + @Test + void testHandleCommandSync() { + Channel channel = Mockito.mock(Channel.class); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.1.254", 27017)); + + Document response = backend.handleCommand(channel, null, "whatsmyuri", null); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + assertThat(response.get("you")).isEqualTo("127.0.1.254:27017"); + } + + @Test + void testHandleCommandAsync() throws Exception { + Channel channel = Mockito.mock(Channel.class); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress("127.0.1.254", 27017)); + + CompletionStage responseFuture = backend.handleCommandAsync(channel, null, "whatsmyuri", null); + Document response = responseFuture.toCompletableFuture().get(); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + assertThat(response.get("you")).isEqualTo("127.0.1.254:27017"); + } + + @Test + void testHandleAdminCommand() { + Channel channel = Mockito.mock(Channel.class); + + Document response = backend.handleCommand(channel, ADMIN_DB_NAME, "ping", null); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + } + + @Test + void testHandleAdminCommandAsync() throws Exception { + Channel channel = Mockito.mock(Channel.class); + + CompletionStage responseFuture = backend.handleCommandAsync(channel, ADMIN_DB_NAME, "ping", null); + Document response = responseFuture.toCompletableFuture().get(); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + } + + @Test + void testMongoDatabaseHandleCommand() { + Channel channel = Mockito.mock(Channel.class); + + Document response = backend.handleCommand(channel, "mockDatabase", "find", null); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + assertThat(response.get("message")).isEqualTo("fakeResponse"); + } + + @Test + void testMongoDatabaseHandleCommandAsync() throws Exception { + Channel channel = Mockito.mock(Channel.class); + + CompletionStage responseFuture = backend.handleCommandAsync(channel, "mockDatabase", "find", null); + Document response = responseFuture.toCompletableFuture().get(); + assertThat(response).isNotNull(); + assertThat(response.get("ok")).isEqualTo(1.0); + assertThat(response.get("message")).isEqualTo("fakeResponse"); + } } From fd58379f64ab8db86113691d9f8a5f0b1c688af8 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Fri, 26 Jun 2020 22:30:38 -0400 Subject: [PATCH 5/8] Update AbstractMongoDatabase Change method visibility to public for extensibility --- .../mongo/backend/AbstractMongoDatabase.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java index 9309f8ddd..603ece10b 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoDatabase.java @@ -39,9 +39,9 @@ public abstract class AbstractMongoDatabase

implements MongoDatabase { - private static final String NAMESPACES_COLLECTION_NAME = "system.namespaces"; + protected static final String NAMESPACES_COLLECTION_NAME = "system.namespaces"; - private static final String INDEXES_COLLECTION_NAME = "system.indexes"; + protected static final String INDEXES_COLLECTION_NAME = "system.indexes"; private static final Logger log = LoggerFactory.getLogger(AbstractMongoDatabase.class); @@ -94,17 +94,17 @@ public String toString() { return getClass().getSimpleName() + "(" + getDatabaseName() + ")"; } - @Override - public Document handleCommand(Channel channel, String command, Document query, Oplog oplog) { + protected Document commandError(Channel channel, String command, Document query) { // getlasterror must not clear the last error if (command.equalsIgnoreCase("getlasterror")) { return commandGetLastError(channel, command, query); } else if (command.equalsIgnoreCase("reseterror")) { return commandResetError(channel); } + return null; + } - clearLastStatus(channel); - + protected Document handleSupportedCommand(Channel channel, String command, Document query, Oplog oplog) { if (command.equalsIgnoreCase("find")) { return commandFind(command, query); } else if (command.equalsIgnoreCase("insert")) { @@ -163,6 +163,18 @@ public Document handleCommand(Channel channel, String command, Document query, O throw new NoSuchCommandException(command); } + @Override + public Document handleCommand(Channel channel, String command, Document query, Oplog oplog) { + Document commandErrorDocument = commandError(channel, command, query); + if (commandErrorDocument != null) { + return commandErrorDocument; + } + + clearLastStatus(channel); + + return handleSupportedCommand(channel, command, query, oplog); + } + private Document listCollections() { List firstBatch = new ArrayList<>(); for (String namespace : listCollectionNamespaces()) { @@ -176,8 +188,7 @@ private Document listCollections() { collectionDescription.put("options", collectionOptions); collectionDescription.put("info", new Document("readOnly", false)); collectionDescription.put("type", "collection"); - collectionDescription.put("idIndex", getPrimaryKeyIndexDescription(namespace) - ); + collectionDescription.put("idIndex", getPrimaryKeyIndexDescription(namespace)); firstBatch.add(collectionDescription); } @@ -237,7 +248,7 @@ private static QueryParameters toQueryParameters(Document query) { return new QueryParameters(querySelector, numberToSkip, numberToReturn, batchSize, projection); } - private QueryParameters toQueryParameters(MongoQuery query, int numberToSkip, int batchSize) { + private static QueryParameters toQueryParameters(MongoQuery query, int numberToSkip, int batchSize) { return new QueryParameters(query.getQuery(), numberToSkip, 0, batchSize, query.getReturnFieldSelector()); } @@ -851,12 +862,12 @@ private Document updateDocuments(String collectionName, Document selector, return collection.updateDocuments(selector, update, arrayFilters, multi, upsert, oplog); } - private void putLastError(Channel channel, MongoServerException ex) { + protected void putLastError(Channel channel, MongoServerException ex) { Document error = toError(channel, ex); putLastResult(channel, error); } - private Document toWriteError(int index, MongoServerException e) { + protected Document toWriteError(int index, MongoServerException e) { Document error = new Document(); error.put("index", index); error.put("errmsg", e.getMessageWithoutErrorCode()); @@ -976,7 +987,7 @@ protected String getFullCollectionNamespace(String collectionName) { return getDatabaseName() + "." + collectionName; } - static boolean isSystemCollection(String collectionName) { + protected static boolean isSystemCollection(String collectionName) { return collectionName.startsWith("system."); } From e22409478176226ffc46078ed2d8bc3510a7ed56 Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Mon, 6 Jul 2020 13:50:17 -0400 Subject: [PATCH 6/8] Add method in AsyncMongoCollection Update MongoCollection --- .../main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java | 4 ++++ core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java index abde2b8ab..0020a1d99 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/AsyncMongoCollection.java @@ -1,11 +1,15 @@ package de.bwaldvogel.mongo; +import java.util.List; import java.util.concurrent.CompletionStage; import de.bwaldvogel.mongo.backend.QueryParameters; import de.bwaldvogel.mongo.backend.QueryResult; +import de.bwaldvogel.mongo.bson.Document; public interface AsyncMongoCollection { CompletionStage handleQueryAsync(QueryParameters queryData); + + CompletionStage> insertDocumentsAsync(List documents, boolean isOrdered); } diff --git a/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java index f10ea038d..066e4c877 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java @@ -78,6 +78,11 @@ default List insertDocuments(List documents) { List insertDocuments(List documents, boolean isOrdered); + @Override + default CompletionStage> insertDocumentsAsync(List documents, boolean isOrdered) { + return FutureUtils.wrap(() -> insertDocuments(documents, isOrdered)); + } + Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters, boolean isMulti, boolean isUpsert, Oplog oplog); From c3d6e17ed11699fd109071f36cc5063527b3337e Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Mon, 6 Jul 2020 13:58:16 -0400 Subject: [PATCH 7/8] Update AbstractMongoCollection Change method visibility to public for extensibility --- .../de/bwaldvogel/mongo/backend/AbstractMongoCollection.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java index d8570f602..e2c06ff0e 100755 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/AbstractMongoCollection.java @@ -469,7 +469,8 @@ public List insertDocuments(List documents, boolean isOrdere return writeErrors; } - private static Document toErrorDocument(MongoServerError e, int index) { + @VisibleForExternalBackends + public static Document toErrorDocument(MongoServerError e, int index) { Document error = new Document(); error.put("index", index); error.put("errmsg", e.getMessageWithoutErrorCode()); @@ -769,7 +770,7 @@ protected P findDocumentPosition(Document document) { protected abstract Stream> streamAllDocumentsWithPosition(); - private boolean isSystemCollection() { + protected boolean isSystemCollection() { return AbstractMongoDatabase.isSystemCollection(getCollectionName()); } From 8109dc39af89ca9ce72cba68d2ff392d01bb3c0a Mon Sep 17 00:00:00 2001 From: Justin Chu Date: Fri, 26 Jun 2020 23:59:08 -0400 Subject: [PATCH 8/8] Update Utils Change method visibility to public for extensibility --- core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java b/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java index 59bb8c329..aa7cae655 100644 --- a/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java +++ b/core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java @@ -166,7 +166,7 @@ static boolean nullAwareEquals(Object a, Object b) { } } - static int calculateSize(Document document) { + public static int calculateSize(Document document) { ByteBuf buffer = Unpooled.buffer(); try { BsonEncoder.encodeDocument(document, buffer); @@ -509,7 +509,7 @@ static Document firstBatchCursorResponse(String ns, Iterable documents return firstBatchCursorResponse(ns, firstBatch); } - static Document firstBatchCursorResponse(String ns, List firstBatch) { + public static Document firstBatchCursorResponse(String ns, List firstBatch) { return firstBatchCursorResponse(ns, firstBatch, EmptyCursor.get()); }