Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions src/keri/core/kraming.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ def kramit(self, msg, **kwa):
else:
raise KramError("Unexpected auth type while kraming.")

def changeConfig(self, newCf):
def changeConfig(self, newCf, deltaOverride=None):
"""
Apply a new cache‑type configuration using full Case‑3 (see KRAM specs), coverage‑aware
semantics. This method enforces all KRAM invariants for safe dynamic
Expand Down Expand Up @@ -1242,6 +1242,16 @@ def changeConfig(self, newCf):
dictionary containing the new cache‑type configuration under:
config["kram"]["caches"]

deltaOverride:
Optional runtime override injected at call time.
If absent, staging delays are computed automatically
(Case‑2 / Case‑3), including Case‑3 coverage diff and worst‑case
delta. If set to a positive integer (milliseconds), that value
is used as ``_pending[ctype]["delta"]`` for every staged update
and no Case‑3 delta computation is performed. Newly introduced
cache‑types are staged with this delay. The admin/user is
responsible for choosing a safe value.

Behavior by case:

• New cache‑type:
Expand All @@ -1264,6 +1274,15 @@ def changeConfig(self, newCf):
# Get the new config
config = newCf.get()
new = config.get("kram", {}).get("caches", {})
if deltaOverride is not None:
try:
deltaOverride = int(deltaOverride)
if deltaOverride <= 0:
raise ValueError
except (TypeError, ValueError) as e:
raise KramConfigurationError(
f"Invalid kram.acceptDeltaOverride: {deltaOverride!r}"
) from e
newRecords = self._validateCtypConfig(new)

# Case 3 coverage aware logic
Expand All @@ -1274,11 +1293,11 @@ def changeConfig(self, newCf):
# Validate coverage (no coverage holes)
self._validateCoverage(oldGraph, newGraph, new)

# Compute coverage diff
coverageDiff = self._computeCoverageDiff(oldGraph, newGraph)

# Compute worst-case delta across coverage
deltaCase3 = self._computeWorstCaseDelta(coverageDiff, old, new)
if deltaOverride is None:
coverageDiff = self._computeCoverageDiff(oldGraph, newGraph)
deltaCase3 = self._computeWorstCaseDelta(coverageDiff, old, new)
else:
deltaCase3 = None

# Get the smallest old accept windows so that it cannot accept
# messages earlier than any existing cache‑type
Expand All @@ -1304,40 +1323,32 @@ def changeConfig(self, newCf):
# Newly introduced cache
if ctype not in old:

# No expansion detected in the coverage graph
if deltaCase3 == 0:
# Safe to apply immediately
if deltaOverride is not None:
delta = deltaOverride
elif deltaCase3 == 0:
rec = newrec
self.db.kramCTYP.pin(ctype, rec)

# Pattern in the coverage graph expanded, accept-window increases must be staged
continue
else:
# Stage accept windows using Case 3 delta
# Get staging start time
start = helping.fromIso8601(helping.nowIso8601()).timestamp() * 1000

# Populate pending with the new values
self._pending[ctype] = {
"d_new": d_new,
"sl_new": sl_new,
"ll_new": ll_new,
"xl_new": xl_new,
"start": start,
"delta": deltaCase3,
}

# Populate the new Cache record, note that pruning values are immediately updated
# while we use the smallest accept-window values determined earlier
rec = CacheTypeRecord(
d=d_new,
sl=min_sl, ll=min_ll, xl=min_xl,
psl=max(psl_new, sl_new),
pll=max(pll_new, ll_new),
pxl=max(pxl_new, xl_new),
)

# Update the cache record inside db
self.db.kramCTYP.pin(ctype, rec)
delta = deltaCase3

start = helping.fromIso8601(helping.nowIso8601()).timestamp() * 1000
self._pending[ctype] = {
"d_new": d_new,
"sl_new": sl_new,
"ll_new": ll_new,
"xl_new": xl_new,
"start": start,
"delta": delta,
}
rec = CacheTypeRecord(
d=d_new,
sl=min_sl, ll=min_ll, xl=min_xl,
psl=max(psl_new, sl_new),
pll=max(pll_new, ll_new),
pxl=max(pxl_new, xl_new),
)
self.db.kramCTYP.pin(ctype, rec)
continue

# Cache is already in old config, determine if case 1 or case 2
Expand Down Expand Up @@ -1378,13 +1389,12 @@ def changeConfig(self, newCf):
d_xl = max(0, xl_new - xl_old)
deltaCase2 = max(d_sl, d_ll, d_xl)

# Unified delta ensures safety across Case 2 and Case 3
delta = max(deltaCase2, deltaCase3)
if deltaOverride is None:
delta = max(deltaCase2, deltaCase3)
else:
delta = deltaOverride

# Get the start time of the change
start = helping.fromIso8601(helping.nowIso8601()).timestamp() * 1000

# Populate pending with the new values
self._pending[ctype] = {
"d_new": d_new,
"sl_new": sl_new,
Expand Down
2 changes: 1 addition & 1 deletion src/keri/kering.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
VEREX2 = ( b'(?P<proto2>[A-Z]{4})'
b'(?P<pmajor2>[0-9A-Za-z_-])(?P<pminor2>[0-9A-Za-z_-]{2})'
b'(?P<gmajor2>[0-9A-Za-z_-])(?P<gminor2>[0-9A-Za-z_-]{2})'
b'(?P<kind2>[A-Z]{4})(?P<size2>[0-9A-Za-z_-]{4})\.')
b'(?P<kind2>[A-Z]{4})(?P<size2>[0-9A-Za-z_-]{4})\\.')

VEREX = VEREX2 + b'|' + VEREX1

Expand Down
124 changes: 124 additions & 0 deletions tests/core/test_kraming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2743,6 +2743,130 @@ def test_dynamic_cache_increase(fakeHelpingClock):
assert "~" not in kramer._pending


def test_change_config_accept_delta_override_larger(fakeHelpingClock):
"""Runtime acceptDeltaOverride longer than computed delta delays reconcileConfig."""
clock = fakeHelpingClock
salt_receiver = Salter(raw=b'0123456789abcdeg').qb64

with openHby(name="receiver", base="test", salt=salt_receiver, temp=True) as receiverHby:
old_cfg = {
"kram": {
"enabled": True,
"caches": {"~": [1000, 1000, 1000, 1000, 1000, 1000, 1000]},
}
}
with openCF(name="kram", base="test", temp=True) as cf:
cf.put(old_cfg)
kramer = Kramer(db=receiverHby.db, cf=cf)

new_cfg = {
"kram": {
"enabled": True,
"caches": {"~": [1000, 5000, 5000, 5000, 5000, 5000, 5000]},
}
}
cf.put(new_cfg)
kramer.changeConfig(cf, deltaOverride=10_000)

pend = kramer._pending["~"]
assert pend["delta"] == 10_000

clock.advance(seconds=4)
kramer.reconcileConfig()
rec = receiverHby.db.kramCTYP.get("~")
assert rec.sl == 1000
assert "~" in kramer._pending

clock.advance(seconds=6)
kramer.reconcileConfig()
rec = receiverHby.db.kramCTYP.get("~")
assert rec.sl == 5000
assert "~" not in kramer._pending


def test_change_config_accept_delta_override_smaller(fakeHelpingClock):
"""Runtime acceptDeltaOverride shorter than computed delta allows earlier reconcileConfig."""
clock = fakeHelpingClock
salt_receiver = Salter(raw=b'0123456789abcdeg').qb64

with openHby(name="receiver", base="test", salt=salt_receiver, temp=True) as receiverHby:
old_cfg = {
"kram": {
"enabled": True,
"caches": {"~": [1000, 1000, 1000, 1000, 1000, 1000, 1000]},
}
}
with openCF(name="kram", base="test", temp=True) as cf:
cf.put(old_cfg)
kramer = Kramer(db=receiverHby.db, cf=cf)

new_cfg = {
"kram": {
"enabled": True,
"caches": {"~": [1000, 5000, 5000, 5000, 5000, 5000, 5000]},
}
}
cf.put(new_cfg)
kramer.changeConfig(cf, deltaOverride="2000")

assert kramer._pending["~"]["delta"] == 2000

clock.advance(seconds=1)
kramer.reconcileConfig()
rec = receiverHby.db.kramCTYP.get("~")
assert rec.sl == 1000
assert "~" in kramer._pending

clock.advance(seconds=1)
kramer.reconcileConfig()
rec = receiverHby.db.kramCTYP.get("~")
assert rec.sl == 5000
assert "~" not in kramer._pending


def test_change_config_accept_delta_invalid():
"""Non-integer or non-positive acceptDeltaOverride raises KramConfigurationError."""
salt_receiver = Salter(raw=b'0123456789abcdeg').qb64

with openHby(name="receiver", base="test", salt=salt_receiver, temp=True) as receiverHby:
old_cfg = {
"kram": {
"enabled": True,
"caches": {"~": [1000, 1000, 1000, 1000, 1000, 1000, 1000]},
}
}
with openCF(name="kram", base="test", temp=True) as cf:
cf.put(old_cfg)
kramer = Kramer(db=receiverHby.db, cf=cf)

cf.put({
"kram": {
"enabled": True,
"caches": {"~": [1000, 5000, 5000, 5000, 5000, 5000, 5000]},
}
})
with pytest.raises(KramConfigurationError):
kramer.changeConfig(cf, deltaOverride=-1)

cf.put({
"kram": {
"enabled": True,
"caches": {"~": [1000, 5000, 5000, 5000, 5000, 5000, 5000]},
}
})
with pytest.raises(KramConfigurationError):
kramer.changeConfig(cf, deltaOverride=0)

cf.put({
"kram": {
"enabled": True,
"caches": {"~": [1000, 5000, 5000, 5000, 5000, 5000, 5000]},
}
})
with pytest.raises(KramConfigurationError):
kramer.changeConfig(cf, deltaOverride="notint")


def test_dynamic_cache_decrease(fakeHelpingClock):
"""
Tests that Kramer.changeConfig() correctly applies:
Expand Down
Loading