Skip to content

Commit

Permalink
Backport of Hash based config entry replication into release/1.16.x (#…
Browse files Browse the repository at this point in the history
…19915)

add hash based config entry replication

Co-authored-by: Dhia Ayachi <[email protected]>
  • Loading branch information
hc-github-team-consul-core and dhiaayachi authored Dec 12, 2023
1 parent d07394f commit dcb4645
Show file tree
Hide file tree
Showing 25 changed files with 1,371 additions and 852 deletions.
3 changes: 3 additions & 0 deletions .changelog/19795.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
wan-federation: use a hash to diff config entries when replicating in the secondary DC to avoid unnecessary writes..
```
6 changes: 6 additions & 0 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6094,6 +6094,9 @@ func (tc testCase) run(format string, dataDir string) func(t *testing.T) {
expected.ACLResolverSettings.NodeName = expected.NodeName
expected.ACLResolverSettings.EnterpriseMeta = *structs.NodeEnterpriseMetaInPartition(expected.PartitionOrDefault())

for i, e := range expected.ConfigEntryBootstrap {
e.SetHash(actual.ConfigEntryBootstrap[i].GetHash())
}
prototest.AssertDeepEqual(t, expected, actual, cmpopts.EquateEmpty())
if tc.cleanup != nil {
tc.cleanup()
Expand Down Expand Up @@ -6977,6 +6980,9 @@ func TestLoad_FullConfig(t *testing.T) {
time.Date(2019, 11, 20, 5, 0, 0, 0, time.UTC)))
r, err := Load(opts)
require.NoError(t, err)
for i, e := range expected.ConfigEntryBootstrap {
e.SetHash(r.RuntimeConfig.ConfigEntryBootstrap[i].GetHash())
}
prototest.AssertDeepEqual(t, expected, r.RuntimeConfig)
require.ElementsMatch(t, expectedWarns, r.Warnings, "Warnings: %#v", r.Warnings)
})
Expand Down
2 changes: 2 additions & 0 deletions agent/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestConfig_Get(t *testing.T) {
ce.CreateIndex = 12
ce.ModifyIndex = 13
ce.EnterpriseMeta = acl.EnterpriseMeta{}
ce.Hash = 0

out, err := a.srv.marshalJSON(req, obj)
require.NoError(t, err)
Expand Down Expand Up @@ -450,6 +451,7 @@ func TestConfig_Apply_IngressGateway(t *testing.T) {
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
Hash: got.GetHash(),
}
require.Equal(t, expect, got)
}
Expand Down
7 changes: 7 additions & 0 deletions agent/configentry/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,10 @@ func EqualID(e1, e2 structs.ConfigEntry) bool {
e1.GetEnterpriseMeta().IsSame(e2.GetEnterpriseMeta()) &&
e1.GetName() == e2.GetName()
}

func SameHash(e1, e2 structs.ConfigEntry) bool {
if e1.GetHash() == 0 || e2.GetHash() == 0 {
return false
}
return e1.GetHash() == e2.GetHash()
}
4 changes: 3 additions & 1 deletion agent/consul/config_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func diffConfigEntries(local []structs.ConfigEntry, remote []structs.ConfigEntry
if configentry.EqualID(local[localIdx], remote[remoteIdx]) {
// config is in both the local and remote state - need to check raft indices
if remote[remoteIdx].GetRaftIndex().ModifyIndex > lastRemoteIndex {
updates = append(updates, remote[remoteIdx])
if !configentry.SameHash(local[localIdx], remote[remoteIdx]) {
updates = append(updates, remote[remoteIdx])
}
}
// increment both indices when equal
localIdx += 1
Expand Down
62 changes: 62 additions & 0 deletions agent/consul/config_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package consul
import (
"context"
"fmt"
"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/assert"
"os"
"testing"

Expand Down Expand Up @@ -268,3 +270,63 @@ func TestReplication_ConfigEntries_GraphValidationErrorDuringReplication(t *test
checkSame(r)
})
}

func createConfigEntries(num int, indexStart int) []structs.ConfigEntry {
entries := make([]structs.ConfigEntry, num)
for i := range entries {
entries[i] = &structs.ServiceConfigEntry{Name: ulid.Make().String(), RaftIndex: structs.RaftIndex{ModifyIndex: uint64(i + indexStart)}}
}
return entries
}

func mutateIDs(e []structs.ConfigEntry, indexStart int) []structs.ConfigEntry {
entries := make([]structs.ConfigEntry, len(e))
for i := range entries {
entries[i] = &structs.ServiceConfigEntry{Name: e[i].GetName(), RaftIndex: structs.RaftIndex{ModifyIndex: uint64(i + indexStart)}}
}
return entries
}

func Test_diffConfigEntries(t *testing.T) {
type args struct {
local []structs.ConfigEntry
remote []structs.ConfigEntry
lastRemoteIndex uint64
normalize bool
}

entries1 := createConfigEntries(10, 10)
entries2 := createConfigEntries(10, 20)
entries3 := append(entries1, entries2...)
entries4 := mutateIDs(entries1, 20)
entries5 := mutateIDs(entries1, 0)
tests := []struct {
name string
args args
updated []structs.ConfigEntry
deleted []structs.ConfigEntry
}{
{"empty", args{local: make([]structs.ConfigEntry, 0), remote: make([]structs.ConfigEntry, 0), lastRemoteIndex: 0, normalize: true}, nil, nil},
{"same", args{local: entries1, remote: entries1, lastRemoteIndex: 0, normalize: true}, nil, nil},
{"new remote", args{local: nil, remote: entries1, lastRemoteIndex: 0, normalize: true}, entries1, nil},
{"extra remote", args{local: entries1, remote: entries3, lastRemoteIndex: 0, normalize: true}, entries2, nil},
{"extra local", args{local: entries3, remote: entries1, lastRemoteIndex: 0, normalize: true}, nil, entries2},
{"same, same size, different raft ID", args{local: entries1, remote: entries4, lastRemoteIndex: 0, normalize: true}, nil, nil},
{"when hash is empty, avoid hash compare", args{local: entries5, remote: entries4, lastRemoteIndex: 0, normalize: false}, entries4, nil},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.args.normalize {
for _, l := range tt.args.local {
require.NoError(t, l.Normalize())
}
for _, r := range tt.args.remote {
require.NoError(t, r.Normalize())
}
}
deletions, updates := diffConfigEntries(tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
assert.Equalf(t, tt.updated, updates, "updated diffConfigEntries(%v, %v, %v)", tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
assert.Equalf(t, tt.deleted, deletions, "deleted diffConfigEntries(%v, %v, %v)", tt.args.local, tt.args.remote, tt.args.lastRemoteIndex)
})
}
}
5 changes: 5 additions & 0 deletions agent/consul/fsm/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,14 +815,17 @@ func TestFSM_SnapshotRestore_CE(t *testing.T) {
// Verify config entries are restored
_, serviceConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceDefaults, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
serviceConfig.SetHash(serviceConfEntry.GetHash())
require.Equal(t, serviceConfig, serviceConfEntry)

_, proxyConfEntry, err := fsm2.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
proxyConfig.SetHash(proxyConfEntry.GetHash())
require.Equal(t, proxyConfig, proxyConfEntry)

_, ingressRestored, err := fsm2.state.ConfigEntry(nil, structs.IngressGateway, "ingress", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
ingress.SetHash(ingressRestored.GetHash())
require.Equal(t, ingress, ingressRestored)

_, restoredGatewayServices, err := fsm2.state.GatewayServices(nil, "ingress", structs.DefaultEnterpriseMetaInDefaultPartition())
Expand Down Expand Up @@ -856,11 +859,13 @@ func TestFSM_SnapshotRestore_CE(t *testing.T) {
// Verify service-intentions is restored
_, serviceIxnEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceIntentions, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
serviceIxn.SetHash(serviceIxnEntry.GetHash())
require.Equal(t, serviceIxn, serviceIxnEntry)

// Verify mesh config entry is restored
_, meshConfigEntry, err := fsm2.state.ConfigEntry(nil, structs.MeshConfig, structs.MeshConfigMesh, structs.DefaultEnterpriseMetaInDefaultPartition())
require.NoError(t, err)
meshConfig.SetHash(meshConfigEntry.GetHash())
require.Equal(t, meshConfig, meshConfigEntry)

_, restoredServiceNames, err := fsm2.state.ServiceNamesOfKind(nil, structs.ServiceKindTypical)
Expand Down
3 changes: 3 additions & 0 deletions agent/consul/intention_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ func TestIntentionApply_WithoutIDs(t *testing.T) {
},
},
RaftIndex: entry.RaftIndex,
Hash: entry.GetHash(),
}

require.Equal(t, expect, entry)
Expand Down Expand Up @@ -689,6 +690,7 @@ func TestIntentionApply_WithoutIDs(t *testing.T) {
},
},
RaftIndex: entry.RaftIndex,
Hash: entry.GetHash(),
}

require.Equal(t, expect, entry)
Expand Down Expand Up @@ -758,6 +760,7 @@ func TestIntentionApply_WithoutIDs(t *testing.T) {
},
},
RaftIndex: entry.RaftIndex,
Hash: entry.GetHash(),
}

require.Equal(t, expect, entry)
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2084,8 +2084,8 @@ func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
LegacyUpdateTime: got.Sources[0].LegacyUpdateTime,
},
},

RaftIndex: got.RaftIndex,
Hash: got.GetHash(),
}

require.Equal(t, expect, got)
Expand Down
7 changes: 7 additions & 0 deletions agent/consul/state/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {

// ConfigEntry is used when restoring from a snapshot.
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
// the hash is recalculated when restoring config entries
// in case a new field is added in a newer version.
h, err := structs.HashConfigEntry(c)
if err != nil {
return err
}
c.SetHash(h)
return insertConfigEntryWithTxn(s.tx, c.GetRaftIndex().ModifyIndex, c)
}

Expand Down
1 change: 1 addition & 0 deletions agent/consul/state/intention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func testStore_IntentionMutation(t *testing.T, s *Store) {
src.LegacyMeta = nil
}
}
expect.SetHash(got.GetHash())
require.Equal(t, expect, got)
}

Expand Down
43 changes: 41 additions & 2 deletions agent/structs/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ type ConfigEntry interface {
GetMeta() map[string]string
GetEnterpriseMeta() *acl.EnterpriseMeta
GetRaftIndex() *RaftIndex
GetHash() uint64
SetHash(h uint64)
}

func HashConfigEntry(conf ConfigEntry) (uint64, error) {
hash, err := hashstructure.Hash(conf, nil)
if err != nil {
return hash, err
}
return hash, nil
}

// ControlledConfigEntry is an optional interface implemented by a ConfigEntry
Expand Down Expand Up @@ -168,8 +178,17 @@ type ServiceConfigEntry struct {
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`

Meta map[string]string `json:",omitempty"`
Hash uint64 `json:",omitempty" hash:"ignore"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
RaftIndex `hash:"ignore"`
}

func (e *ServiceConfigEntry) SetHash(h uint64) {
e.Hash = h
}

func (e *ServiceConfigEntry) GetHash() uint64 {
return e.Hash
}

func (e *ServiceConfigEntry) Clone() *ServiceConfigEntry {
Expand Down Expand Up @@ -224,6 +243,11 @@ func (e *ServiceConfigEntry) Normalize() error {
}
}
}
h, err := HashConfigEntry(e)
if err != nil {
return err
}
e.Hash = h

return validationErr
}
Expand Down Expand Up @@ -409,8 +433,17 @@ type ProxyConfigEntry struct {
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`

Meta map[string]string `json:",omitempty"`
Hash uint64 `json:",omitempty" hash:"ignore"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
RaftIndex `hash:"ignore"`
}

func (e *ProxyConfigEntry) SetHash(h uint64) {
e.Hash = h
}

func (e *ProxyConfigEntry) GetHash() uint64 {
return e.Hash
}

func (e *ProxyConfigEntry) GetKind() string {
Expand Down Expand Up @@ -449,7 +482,13 @@ func (e *ProxyConfigEntry) Normalize() error {

e.EnterpriseMeta.Normalize()

h, err := HashConfigEntry(e)
if err != nil {
return err
}
e.Hash = h
return nil

}

func (e *ProxyConfigEntry) Validate() error {
Expand Down
48 changes: 45 additions & 3 deletions agent/structs/config_entry_discoverychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,17 @@ type ServiceRouterConfigEntry struct {
Routes []ServiceRoute

Meta map[string]string `json:",omitempty"`
Hash uint64 `json:",omitempty" hash:"ignore"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
RaftIndex `hash:"ignore"`
}

func (e *ServiceRouterConfigEntry) SetHash(h uint64) {
e.Hash = h
}

func (e *ServiceRouterConfigEntry) GetHash() uint64 {
return e.Hash
}

func (e *ServiceRouterConfigEntry) GetKind() string {
Expand Down Expand Up @@ -129,6 +138,11 @@ func (e *ServiceRouterConfigEntry) Normalize() error {
}
}

h, err := HashConfigEntry(e)
if err != nil {
return err
}
e.Hash = h
return nil
}

Expand Down Expand Up @@ -537,8 +551,17 @@ type ServiceSplitterConfigEntry struct {
Splits []ServiceSplit

Meta map[string]string `json:",omitempty"`
Hash uint64 `json:",omitempty" hash:"ignore"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
RaftIndex `hash:"ignore"`
}

func (e *ServiceSplitterConfigEntry) SetHash(h uint64) {
e.Hash = h
}

func (e *ServiceSplitterConfigEntry) GetHash() uint64 {
return e.Hash
}

func (e *ServiceSplitterConfigEntry) GetKind() string {
Expand Down Expand Up @@ -581,6 +604,11 @@ func (e *ServiceSplitterConfigEntry) Normalize() error {
}
}

h, err := HashConfigEntry(e)
if err != nil {
return err
}
e.Hash = h
return nil
}

Expand Down Expand Up @@ -876,8 +904,17 @@ type ServiceResolverConfigEntry struct {
LoadBalancer *LoadBalancer `json:",omitempty" alias:"load_balancer"`

Meta map[string]string `json:",omitempty"`
Hash uint64 `json:",omitempty" hash:"ignore"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
RaftIndex `hash:"ignore"`
}

func (e *ServiceResolverConfigEntry) SetHash(h uint64) {
e.Hash = h
}

func (e *ServiceResolverConfigEntry) GetHash() uint64 {
return e.Hash
}

func (e *ServiceResolverConfigEntry) RelatedPeers() []string {
Expand Down Expand Up @@ -998,6 +1035,11 @@ func (e *ServiceResolverConfigEntry) Normalize() error {

e.EnterpriseMeta.Normalize()

h, err := HashConfigEntry(e)
if err != nil {
return err
}
e.Hash = h
return nil
}

Expand Down
Loading

0 comments on commit dcb4645

Please sign in to comment.