Skip to content
This repository was archived by the owner on May 15, 2025. It is now read-only.
Open

test #28

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.enterprise.cloudsearch.sdk.CheckpointCloseableIterable;
import com.google.enterprise.cloudsearch.sdk.ConnectorScheduler.OneAtATimeRunnable;
import com.google.enterprise.cloudsearch.sdk.IncrementalChangeHandler;
import com.google.enterprise.cloudsearch.sdk.InvalidConfigurationException;
import com.google.enterprise.cloudsearch.sdk.RepositoryException;
import com.google.enterprise.cloudsearch.sdk.StatsManager;
import com.google.enterprise.cloudsearch.sdk.config.Configuration;
import com.google.enterprise.cloudsearch.sdk.indexing.DefaultAcl;
import com.google.enterprise.cloudsearch.sdk.indexing.IndexingConnector;
Expand All @@ -48,6 +51,9 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -137,8 +143,11 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha
private RepositoryContext repositoryContext;
private CheckpointHandler checkpointHandler;
private boolean useQueues;
@VisibleForTesting QueueCheckpoint queueCheckpoint;
@VisibleForTesting
QueueCheckpoint queueCheckpoint;
private int partitionSize;
private ScheduledExecutorService scheduleExecutor;
private ExecutorService backgroundExecutor;

/**
* Creates an instance of {@link FullTraversalConnector} for performing full traversal over given
Expand All @@ -148,6 +157,12 @@ public class FullTraversalConnector implements IndexingConnector, IncrementalCha
*/
public FullTraversalConnector(Repository repository) {
this(repository, null);
scheduleExecutor =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("schedule").build());
backgroundExecutor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(false).setNameFormat("background").build());
}

/**
Expand Down Expand Up @@ -304,6 +319,30 @@ private interface GetDocsFunction {
CheckpointCloseableIterable<ApiOperation> apply(byte[] checkpoint) throws RepositoryException;
}

/**
* Runnable that when invoked executes the delegate with {@link #backgroundExecutor} and then
* returns before completion. That implies that uses of this class must ensure they do not add an
* instance directly to {@link #backgroundExecutor}, otherwise an odd infinite loop will
* occur.
*/
protected class BackgroundRunnable implements Runnable {

private final Runnable delegate;

public BackgroundRunnable(Runnable delegate) {
this.delegate = checkNotNull(delegate);
}

@Override
public void run() {
try {
backgroundExecutor.execute(delegate);
} catch (Throwable t) {
logger.log(Level.WARNING, "Failed to start background runnable", t);
}
}
}

/**
* Performs a repository traversal of a given type.
*
Expand All @@ -313,6 +352,13 @@ private interface GetDocsFunction {
private boolean doTraverse(String traversalType, String checkpointName, String queueName,
GetDocsFunction getDocs)
throws IOException, InterruptedException {

Runnable loggingStatsRunnable =
new BackgroundRunnable(
new OneAtATimeRunnable(
() -> logger.info(StatsManager.getInstance().printStats()), "StatsLog"));
scheduleExecutor.scheduleAtFixedRate(loggingStatsRunnable, 1, 5, TimeUnit.MINUTES);

logger.log(Level.INFO, "Begin {0} traversal.", traversalType);
ExecuteCounter executeCounter = new ExecuteCounter();
byte[] checkpoint = checkpointHandler.readCheckpoint(checkpointName);
Expand Down Expand Up @@ -462,7 +508,7 @@ public List<GenericJson> call() throws Exception {
*/
private List<GenericJson> executeOperation(
ApiOperation operation, ExecuteCounter executeCounter)
throws InterruptedException, IOException {
throws InterruptedException, IOException {
// most should be update item ops, but allow other ops (delete item, etc.)
executeCounter.incrementTotal();
String displayId = "[not an item update]";
Expand Down