Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package de.bwaldvogel.mongo;

import java.util.List;
import java.util.concurrent.CompletionStage;

import de.bwaldvogel.mongo.backend.QueryParameters;
import de.bwaldvogel.mongo.backend.QueryResult;
import de.bwaldvogel.mongo.bson.Document;

public interface AsyncMongoCollection {

CompletionStage<QueryResult> handleQueryAsync(QueryParameters queryData);

CompletionStage<List<Document>> insertDocumentsAsync(List<Document> documents, boolean isOrdered);
}
5 changes: 5 additions & 0 deletions core/src/main/java/de/bwaldvogel/mongo/MongoCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ default List<Document> insertDocuments(List<Document> documents) {

List<Document> insertDocuments(List<Document> documents, boolean isOrdered);

@Override
default CompletionStage<List<Document>> insertDocumentsAsync(List<Document> documents, boolean isOrdered) {
return FutureUtils.wrap(() -> insertDocuments(documents, isOrdered));
}

Document updateDocuments(Document selector, Document update, ArrayFilters arrayFilters,
boolean isMulti, boolean isUpsert, Oplog oplog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -33,7 +32,6 @@
import de.bwaldvogel.mongo.oplog.CollectionBackedOplog;
import de.bwaldvogel.mongo.oplog.NoopOplog;
import de.bwaldvogel.mongo.oplog.Oplog;
import de.bwaldvogel.mongo.util.FutureUtils;
import de.bwaldvogel.mongo.wire.BsonConstants;
import de.bwaldvogel.mongo.wire.MongoWireProtocolHandler;
import de.bwaldvogel.mongo.wire.message.Message;
Expand All @@ -52,7 +50,7 @@ public abstract class AbstractMongoBackend implements MongoBackend {

protected static final String OPLOG_COLLECTION_NAME = "oplog.rs";

static final String ADMIN_DB_NAME = "admin";
protected static final String ADMIN_DB_NAME = "admin";

private final Map<String, MongoDatabase> databases = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -150,7 +148,7 @@ private Document getLog(String argument) {
return response;
}

private Document handleAdminCommand(String command, Document query) {
protected Document handleAdminCommand(String command, Document query) {
if (command.equalsIgnoreCase("listdatabases")) {
List<Document> databases = listDatabaseNames().stream()
.sorted()
Expand Down Expand Up @@ -298,8 +296,7 @@ private MongoCollection<?> resolveCollection(final String namespace) {

protected abstract MongoDatabase openOrCreateDatabase(String databaseName);

// handle command synchronously
private Document handleCommandSync(Channel channel, String databaseName, String command, Document query) {
protected Document handleCommandSync(Channel channel, String databaseName, String command, Document query) {
if (command.equalsIgnoreCase("whatsmyuri")) {
Document response = new Document();
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
Expand Down Expand Up @@ -334,9 +331,9 @@ private Document handleCommandSync(Channel channel, String databaseName, String

@Override
public Document handleCommand(Channel channel, String databaseName, String command, Document query) {
Document commandSync = handleCommandSync(channel, databaseName, command, query);
if (commandSync != null) {
return commandSync;
Document commandResponse = handleCommandSync(channel, databaseName, command, query);
if (commandResponse != null) {
return commandResponse;
}

if (databaseName.equals(ADMIN_DB_NAME)) {
Expand All @@ -346,36 +343,6 @@ public Document handleCommand(Channel channel, String databaseName, String comma
return resolveDatabase(databaseName).handleCommand(channel, command, query, oplog);
}

@Override
public CompletionStage<Document> handleCommandAsync(Channel channel, String database,
String command, Document query) {
if ("dropDatabase".equalsIgnoreCase(command)) {
return dropDatabaseAsync(database)
.handle((aVoid, ex) -> {
Document response = new Document("dropped", database);
if (ex != null) {
response.put("errmsg", ex.getMessage());
response.put("ok", 0.0);
log.error("dropDatabase " + database + " error!", ex);
} else {
Utils.markOkay(response);
}
return response;
});
}

Document commandSync = handleCommandSync(channel, database, command, query);
if (commandSync != null) {
return FutureUtils.wrap(() -> commandSync);
}

if (database.equals(ADMIN_DB_NAME)) {
return FutureUtils.wrap(() -> handleAdminCommand(command, query));
}

return resolveDatabase(database).handleCommandAsync(channel, command, query, oplog);
}

@Override
public Collection<Document> getCurrentOperations(MongoQuery query) {
// TODO
Expand All @@ -384,12 +351,8 @@ public Collection<Document> getCurrentOperations(MongoQuery query) {

@Override
public QueryResult handleQuery(MongoQuery query) {
return resolveDatabase(query).handleQuery(query);
}

@Override
public CompletionStage<QueryResult> handleQueryAsync(MongoQuery query) {
return resolveDatabase(query).handleQueryAsync(query);
MongoDatabase db = resolveDatabase(query);
return db.handleQuery(query);
}

@Override
Expand All @@ -410,32 +373,20 @@ public QueryResult handleGetMore(MongoGetMore getMore) {

@Override
public void handleInsert(MongoInsert insert) {
resolveDatabase(insert).handleInsert(insert, oplog);
}

@Override
public CompletionStage<Void> handleInsertAsync(MongoInsert insert) {
return resolveDatabase(insert).handleInsertAsync(insert, oplog);
MongoDatabase db = resolveDatabase(insert);
db.handleInsert(insert, oplog);
}

@Override
public void handleDelete(MongoDelete delete) {
resolveDatabase(delete).handleDelete(delete, oplog);
}

@Override
public CompletionStage<Void> handleDeleteAsync(MongoDelete delete) {
return resolveDatabase(delete).handleDeleteAsync(delete, oplog);
MongoDatabase db = resolveDatabase(delete);
db.handleDelete(delete, oplog);
}

@Override
public void handleUpdate(MongoUpdate update) {
resolveDatabase(update).handleUpdate(update, oplog);
}

@Override
public CompletionStage<Void> handleUpdateAsync(MongoUpdate update) {
return resolveDatabase(update).handleUpdateAsync(update, oplog);
MongoDatabase db = resolveDatabase(update);
db.handleUpdate(update, oplog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ public List<Document> insertDocuments(List<Document> documents, boolean isOrdere
return writeErrors;
}

private static Document toErrorDocument(MongoServerError e, int index) {
@VisibleForExternalBackends
public static Document toErrorDocument(MongoServerError e, int index) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected would be enough.
Please add the @VisibleForExternalBackends to indicate why it’s not a private method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VisibleForExternalBackends added.

Document error = new Document();
error.put("index", index);
error.put("errmsg", e.getMessageWithoutErrorCode());
Expand Down Expand Up @@ -769,7 +770,7 @@ protected P findDocumentPosition(Document document) {

protected abstract Stream<DocumentWithPosition<P>> streamAllDocumentsWithPosition();

private boolean isSystemCollection() {
protected boolean isSystemCollection() {
return AbstractMongoDatabase.isSystemCollection(getCollectionName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -32,7 +31,6 @@
import de.bwaldvogel.mongo.exception.NoSuchCommandException;
import de.bwaldvogel.mongo.oplog.NoopOplog;
import de.bwaldvogel.mongo.oplog.Oplog;
import de.bwaldvogel.mongo.util.FutureUtils;
import de.bwaldvogel.mongo.wire.message.MongoDelete;
import de.bwaldvogel.mongo.wire.message.MongoInsert;
import de.bwaldvogel.mongo.wire.message.MongoQuery;
Expand All @@ -41,9 +39,9 @@

public abstract class AbstractMongoDatabase<P> implements MongoDatabase {

private static final String NAMESPACES_COLLECTION_NAME = "system.namespaces";
protected static final String NAMESPACES_COLLECTION_NAME = "system.namespaces";

private static final String INDEXES_COLLECTION_NAME = "system.indexes";
protected static final String INDEXES_COLLECTION_NAME = "system.indexes";

private static final Logger log = LoggerFactory.getLogger(AbstractMongoDatabase.class);

Expand Down Expand Up @@ -96,7 +94,7 @@ public String toString() {
return getClass().getSimpleName() + "(" + getDatabaseName() + ")";
}

private Document commandError(Channel channel, String command, Document query) {
protected Document commandError(Channel channel, String command, Document query) {
// getlasterror must not clear the last error
if (command.equalsIgnoreCase("getlasterror")) {
return commandGetLastError(channel, command, query);
Expand All @@ -106,8 +104,7 @@ private Document commandError(Channel channel, String command, Document query) {
return null;
}

// handle command synchronously
private Document handleCommandSync(Channel channel, String command, Document query, Oplog oplog) {
protected Document handleSupportedCommand(Channel channel, String command, Document query, Oplog oplog) {
if (command.equalsIgnoreCase("find")) {
return commandFind(command, query);
} else if (command.equalsIgnoreCase("insert")) {
Expand Down Expand Up @@ -175,23 +172,7 @@ public Document handleCommand(Channel channel, String command, Document query, O

clearLastStatus(channel);

return handleCommandSync(channel, command, query, oplog);
}

@Override
public CompletionStage<Document> handleCommandAsync(Channel channel, String command, Document query, Oplog oplog) {
Document commandErrorDocument = commandError(channel, command, query);
if (commandErrorDocument != null) {
return FutureUtils.wrap(() -> commandErrorDocument);
}

clearLastStatus(channel);

if ("find".equalsIgnoreCase(command)) {
return commandFindAsync(command, query);
}

return FutureUtils.wrap(() -> handleCommandSync(channel, command, query, oplog));
return handleSupportedCommand(channel, command, query, oplog);
}

private Document listCollections() {
Expand Down Expand Up @@ -254,18 +235,6 @@ private Document commandFind(String command, Document query) {
return toCursorResponse(collection, queryResult);
}

private CompletionStage<Document> commandFindAsync(String command, Document query) {
String collectionName = (String) query.get(command);
MongoCollection<P> collection = resolveCollection(collectionName, false);
if (collection == null) {
return FutureUtils.wrap(() -> Utils.firstBatchCursorResponse(getFullCollectionNamespace(collectionName),
Collections.emptyList()));
}
QueryParameters queryParameters = toQueryParameters(query);
return collection.handleQueryAsync(queryParameters)
.thenApply(queryResult -> toCursorResponse(collection, queryResult));
}

private static QueryParameters toQueryParameters(Document query) {
int numberToSkip = ((Number) query.getOrDefault("skip", 0)).intValue();
int numberToReturn = ((Number) query.getOrDefault("limit", 0)).intValue();
Expand All @@ -279,7 +248,7 @@ private static QueryParameters toQueryParameters(Document query) {
return new QueryParameters(querySelector, numberToSkip, numberToReturn, batchSize, projection);
}

private QueryParameters toQueryParameters(MongoQuery query, int numberToSkip, int batchSize) {
private static QueryParameters toQueryParameters(MongoQuery query, int numberToSkip, int batchSize) {
return new QueryParameters(query.getQuery(), numberToSkip, 0, batchSize, query.getReturnFieldSelector());
}

Expand Down Expand Up @@ -648,26 +617,6 @@ public QueryResult handleQuery(MongoQuery query) {
return collection.handleQuery(queryParameters);
}

@Override
public CompletionStage<QueryResult> handleQueryAsync(MongoQuery query) {
clearLastStatus(query.getChannel());
String collectionName = query.getCollectionName();
MongoCollection<P> collection = resolveCollection(collectionName, false);
if (collection == null) {
return FutureUtils.wrap(QueryResult::new);
}
int numberToSkip = query.getNumberToSkip();
int batchSize = query.getNumberToReturn();

if (batchSize < -1) {
// actually: request to close cursor automatically
batchSize = -batchSize;
}

QueryParameters queryData = toQueryParameters(query, numberToSkip, batchSize);
return collection.handleQueryAsync(queryData);
}

@Override
public void handleClose(Channel channel) {
lastResults.remove(channel);
Expand Down Expand Up @@ -913,12 +862,12 @@ private Document updateDocuments(String collectionName, Document selector,
return collection.updateDocuments(selector, update, arrayFilters, multi, upsert, oplog);
}

private void putLastError(Channel channel, MongoServerException ex) {
protected void putLastError(Channel channel, MongoServerException ex) {
Document error = toError(channel, ex);
putLastResult(channel, error);
}

private Document toWriteError(int index, MongoServerException e) {
protected Document toWriteError(int index, MongoServerException e) {
Document error = new Document();
error.put("index", index);
error.put("errmsg", e.getMessageWithoutErrorCode());
Expand Down Expand Up @@ -1031,14 +980,14 @@ public void moveCollection(MongoDatabase oldDatabase, MongoCollection<?> collect
new Document("$set", new Document("ns", newCollection.getFullName())),
ArrayFilters.empty(), true, false, NoopOplog.get());

namespaces.insertDocuments(newDocuments, true);
namespaces.insertDocuments(newDocuments);
}

protected String getFullCollectionNamespace(String collectionName) {
return getDatabaseName() + "." + collectionName;
}

static boolean isSystemCollection(String collectionName) {
protected static boolean isSystemCollection(String collectionName) {
return collectionName.startsWith("system.");
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/de/bwaldvogel/mongo/backend/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ static boolean nullAwareEquals(Object a, Object b) {
}
}

static int calculateSize(Document document) {
public static int calculateSize(Document document) {
ByteBuf buffer = Unpooled.buffer();
try {
BsonEncoder.encodeDocument(document, buffer);
Expand Down Expand Up @@ -509,7 +509,7 @@ static Document firstBatchCursorResponse(String ns, Iterable<Document> documents
return firstBatchCursorResponse(ns, firstBatch);
}

static Document firstBatchCursorResponse(String ns, List<Document> firstBatch) {
public static Document firstBatchCursorResponse(String ns, List<Document> firstBatch) {
return firstBatchCursorResponse(ns, firstBatch, EmptyCursor.get());
}

Expand Down
Loading