-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-24069 Turn the pending assignments into a queue #5244
base: main
Are you sure you want to change the base?
Conversation
@@ -200,7 +201,14 @@ public static CompletableFuture<Void> updatePendingAssignmentsKeys( | |||
|
|||
boolean isNewAssignments = !tableCfgPartAssignments.equals(partAssignments); | |||
|
|||
byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments, assignmentsTimestamp); | |||
Assignments partAssignmentsPlanned = Assignments.of(partAssignments, assignmentsTimestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why "planned"?
And let's introduce new name for assignments that will be last in the pending queue, for example "target assignments" or something like that, to compare them with stable like it is done below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated - partAssignmentsPendingQueue
is formed by PendingAssignmentsCalculator
by providing current stable and planned assignments
@@ -338,7 +339,7 @@ private static CompletableFuture<Integer> partitionUpdate( | |||
Iif invokeClosure = prepareMsInvokeClosure( | |||
partId, | |||
longToBytesKeepingOrder(revision), | |||
Assignments.forced(Set.of(nextAssignment), assignmentsTimestamp).toBytes(), | |||
new AssignmentsQueue(Assignments.forced(Set.of(nextAssignment), assignmentsTimestamp)).toBytes(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not going to remain like that in the final code after epic is done. We have discussed that you will need some method/class calculating the pending queue from current stable and target assignments. What do you think about adding the naive version of this class right now (adding just one element to the queue), to not touch this and similar places in code again when doing your future tickets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added PendingAssignmentsCalculator
* @return the head of this queue, or {@code Assignments.EMPTY} if this queue is empty | ||
*/ | ||
public Assignments poll() { | ||
return Objects.requireNonNullElse(queue.poll(), Assignments.EMPTY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure returning EMPTY is the best way to deal with an empty queue. The empty queue seems to be an unexpected case, so we can place an assertion here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.jetbrains.annotations.Nullable; | ||
|
||
/** | ||
* Class that encapsulates a queue with consequent configuration switches between planned and stable configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Class that encapsulates a queue with consequent configuration switches between planned and stable configuration | |
* Class that encapsulates a queue with consequent configuration switches between pending and stable configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed mention of planned/pending assignments to avoid confusion
* <li> node removed from configuration | ||
* <li> node added to configuration with different type (peer or learner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls add indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
assert partAssignmentsPlanned.equals(partAssignmentsPending.peekLast()); | ||
|
||
byte[] partAssignmentsPlannedBytes = partAssignmentsPlanned.toBytes(); | ||
byte[] partAssignmentsBytes = partAssignmentsPending.toBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also go to RebalanceUtil#PENDING_ASSIGNMENTS_PREFIX and RebalanceUtil#pendingPartAssignmentsKey and rename them and change their javadocs, to make them different from stable and planned assignment keys and prefixes and denote that it is not a set of assignments, but queue of sets. Also, there is a link to .md file which also tells about pending assignments, it also needs to be fixed. Same for zone keys.
Also, please check the comments in RebalanceUtil/RebalanceUtilEx/ZoneRebalanceUtil/RebalanceRaftGroupEventsListener, etc. which describe the meta storage operations in pseudocode, for example, things like this may be confusing:
if partition.assignments.pending != calcPartAssignments
Logging also became invalid, for example, in RebalanceUtil#updatePendingAssignmentsKeys:
case PENDING_KEY_UPDATED:
LOG.info(
"Update metastore pending partitions key [key={}, partition={}, table={}/{}, newVal={}]",
partAssignmentsPendingKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(),
partAssignments);
partAssignments is not a pending queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefixes renamed - e.g. PENDING_ASSIGNMENTS_QUEUE_PREFIX
, rebalance.md
and pseudocode comments updated, logs updated
@@ -183,7 +193,8 @@ private WatchListener createPendingAssignmentsListener() { | |||
private static void handleReceivedAssignments( | |||
WatchEvent event, | |||
byte[] assignmentsMetastoreKeyPrefix, | |||
Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap | |||
Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, | |||
Function<byte[], Set<Assignment>> deserFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
up to you, but I would prefer deserializationFunction
or deserializer
, because we dont usually use such contractions and there is no javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* Reads assignments from the metastorage locally. The resulting map is a {@code tableId -> {partitionId -> assignments}} mapping. | ||
*/ | ||
Map<Integer, Map<Integer, Assignments>> readAssignments(byte[] prefix, long appliedRevision) { | ||
Map<Integer, Map<Integer, Assignments>> readAssignments(byte[] prefix, long appliedRevision, Function<byte[], Assignments> valDeser) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same about contraction :) I suggest deserializer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
# Conflicts: # modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
https://issues.apache.org/jira/browse/IGNITE-24069
Assignments
toAssignmentsQueue
with own versioned serializer