Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.flogger.FluentLogger;
import com.scylladb.cdc.cql.WorkerCQL.Reader;
import com.scylladb.cdc.model.FutureUtils;
import com.scylladb.cdc.model.Timestamp;

abstract class TaskAction {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
Expand Down Expand Up @@ -180,7 +181,9 @@ public MoveToNextWindowTaskAction(WorkerConfiguration workerConfiguration, Task

@Override
public CompletableFuture<TaskAction> run() {
TaskState newState = task.state.moveToNextWindow(workerConfiguration.queryTimeWindowSizeMs);
Date now = Date.from(workerConfiguration.getClock().instant());

TaskState newState = task.state.moveToNextWindow(new Timestamp(now), workerConfiguration.confidenceWindowSizeMs, workerConfiguration.queryTimeWindowSizeMs);
workerConfiguration.transport.moveStateToNextWindow(task.id, newState);
Task newTask = task.updateState(newState);
return CompletableFuture.completedFuture(new ReadNewWindowTaskAction(workerConfiguration, newTask, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ public Timestamp getWindowEndTimestamp() {
return windowEnd;
}

public TaskState moveToNextWindow(long nextWindowSizeMs) {
return new TaskState(windowEnd, windowEnd.plus(nextWindowSizeMs, ChronoUnit.MILLIS), Optional.empty());
public TaskState moveToNextWindow(Timestamp now, long confidenceWindowSizeMs, long newQueryWindowSizeMs) {
Timestamp newWindowEnd = now.plus(-confidenceWindowSizeMs, ChronoUnit.MILLIS);

// Make sure that the window is at least newQueryWindowSizeMs long.
long windowLength = ChronoUnit.MILLIS.between(windowEnd.toDate().toInstant(), newWindowEnd.toDate().toInstant());
if (windowLength < newQueryWindowSizeMs) {
newWindowEnd = windowEnd.plus(newQueryWindowSizeMs, ChronoUnit.MILLIS);
}

return new TaskState(windowEnd, newWindowEnd, Optional.empty());
}

public TaskState update(ChangeId seen) {
Expand Down Expand Up @@ -86,12 +94,24 @@ public String toString() {
/*
* Creates an initial state for tasks in a given |generation|.
*
* Such initial state starts at the beginning of the generation and spans for
* |windowSizeMs| milliseconds.
* Such initial state starts at the beginning of the generation and spans
* until now minus confidence window size.
*/
public static TaskState createInitialFor(GenerationId generation, long windowSizeMs) {
Timestamp generationStart = generation.getGenerationStart();
return new TaskState(generationStart, generationStart.plus(windowSizeMs, ChronoUnit.MILLIS), Optional.empty());
public static TaskState createInitialFor(GenerationId generation, Timestamp now,
long confidenceWindowSizeMs, long queryTimeWindowSizeMs) {
// Start reading at generation start:
Timestamp windowStart = generation.getGenerationStart();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the conclusion of your tests @avelanarius? Does it make sense to take max(generation.getGenerationStart(), now - CDC ttl)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it made sense. The test was the following: create a table with small TTL (3 seconds), insert 10^6 rows, then do two types of selects: select of small window and select of a large window. Both of those queries returned no rows (3 second TTL...), but a select of small window was faster. Sometimes, select of a large window even caused timeouts (even though no rows were returned). We hypothesized that this is due to replicas sending "tombstones" to coordinator.


// Create a large window up to (now - confidenceWindowSizeMs), except
// when the window gets too small - in that case create a window
// queryTimeWindowSizeMs large (the consumer might need to wait a bit
// for the window to be ready for reading).
Timestamp windowEnd = now.plus(-confidenceWindowSizeMs, ChronoUnit.MILLIS);
if (windowEnd.compareTo(windowStart) < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this enough? Wouldn't we want to adjust windowEnd also when windowEnd - windowStart < queryTimeWindowSizeMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, but this is only an initial window so it doesn't really matter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add this at the cost of additional complexity.

windowEnd = windowStart.plus(queryTimeWindowSizeMs, ChronoUnit.MILLIS);
}

return new TaskState(windowStart, windowEnd, Optional.empty());
}

/* If the state is before |minimumWindowStart| then this method returns a state
Expand All @@ -108,6 +128,12 @@ public TaskState trimTaskState(Timestamp minimumWindowStart, long windowSizeMs)
return new TaskState(minimumWindowStart, minimumWindowStart.plus(windowSizeMs, ChronoUnit.MILLIS), Optional.empty());
}

return this;
// Trim the start of the window with minimumWindowStart.
Timestamp newWindowStart = windowStart;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wasn't this needed before and now is? Or was it and it was a bug we weren't trimming the start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it only handled one case:

---- WINDOW ---     | TTL

->

                    ---- WINDOW ---

Now it also handles this case:

----      WINDOW           ---     
                    | TTL
->

                    - WINDOW -     
                    | TTL

Previously it didn't really matter to do the second type of trimming, because the windows were very small (default 30 seconds).

if (newWindowStart.compareTo(minimumWindowStart) < 0) {
newWindowStart = minimumWindowStart;
}

return new TaskState(newWindowStart, windowEnd, lastConsumedChangeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ private static GenerationId getGenerationIdOfStreams(Map<TaskId, SortedSet<Strea
* All streams are assumed to belong to the same generation and the initial
* state is build based on the ID of this generation.
*/
private static TaskState getInitialStateForStreams(Map<TaskId, SortedSet<StreamId>> groupedStreams,
long windowSizeMs) {
return TaskState.createInitialFor(getGenerationIdOfStreams(groupedStreams), windowSizeMs);
private TaskState getInitialStateForStreams(Map<TaskId, SortedSet<StreamId>> groupedStreams) {
Timestamp now = new Timestamp(Date.from(workerConfiguration.getClock().instant()));

return TaskState.createInitialFor(getGenerationIdOfStreams(groupedStreams), now, workerConfiguration.confidenceWindowSizeMs, workerConfiguration.queryTimeWindowSizeMs);
}

/*
Expand All @@ -64,7 +65,7 @@ private static TaskState getInitialStateForStreams(Map<TaskId, SortedSet<StreamI
*/
private Stream<Task> createTasksWithState(Map<TaskId, SortedSet<StreamId>> groupedStreams) throws ExecutionException, InterruptedException {
Map<TaskId, TaskState> states = workerConfiguration.transport.getTaskStates(groupedStreams.keySet());
TaskState initialState = getInitialStateForStreams(groupedStreams, workerConfiguration.queryTimeWindowSizeMs);
TaskState initialState = getInitialStateForStreams(groupedStreams);

Set<TableName> tableNames = groupedStreams.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet());
Date now = Date.from(workerConfiguration.getClock().instant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public boolean isReaderFinished(Task task) {
return finishedReaders.contains(task);
}

public Set<Task> getFinishedReaders() {
return finishedReaders;
}

public void setCQLErrorStrategy(ErrorInject errorStrategy) {
this.cqlErrorStrategy = Preconditions.checkNotNull(errorStrategy);
}
Expand Down
Loading