fix: require participantId on newReliabilityManager

This commit is contained in:
darshankabariya 2026-05-08 03:42:46 +05:30
parent 881d8cb359
commit abe857b1de
No known key found for this signature in database
GPG Key ID: 9A92CCD9899F0D22
5 changed files with 47 additions and 34 deletions

View File

@ -33,7 +33,11 @@ proc destroyShared(self: ptr SdsLifecycleRequest) =
proc createReliabilityManager(
appCallbacks: AppCallbacks = nil
): Future[Result[ReliabilityManager, string]] {.async.} =
let rm = newReliabilityManager().valueOr:
# TODO: thread `participantId` through SdsNewReliabilityManager FFI input
# and remove this hardcoded "". Empty id silently disables SDS-R; this is
# acceptable as a temporary FFI-only fallback until sds-go-bindings and
# logos-delivery's C-side caller are updated to supply the identity.
let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr:
error "Failed creating reliability manager", error = error
return err("Failed creating reliability manager: " & $error)

View File

@ -5,15 +5,16 @@ import sds/[types, protobuf, sds_utils, rolling_bloom_filter]
export types, protobuf, sds_utils, rolling_bloom_filter
proc newReliabilityManager*(
participantId: SdsParticipantID,
config: ReliabilityConfig = defaultConfig(),
participantId: SdsParticipantID = "".SdsParticipantID,
persistence: Persistence = noOpPersistence(),
): Result[ReliabilityManager, ReliabilityError] =
## Creates a new multi-channel ReliabilityManager.
## `participantId` is REQUIRED (see `ReliabilityManager.new`).
## `persistence` defaults to a no-op backend; supply a real one to durably
## store SDS state across restarts.
try:
let rm = ReliabilityManager.new(config, participantId, persistence)
let rm = ReliabilityManager.new(participantId, config, persistence)
return ok(rm)
except Exception:
error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg()

View File

@ -27,10 +27,16 @@ type ReliabilityManager* = ref object
proc new*(
T: type ReliabilityManager,
participantId: SdsParticipantID,
config: ReliabilityConfig,
participantId: SdsParticipantID = "".SdsParticipantID,
persistence: Persistence = noOpPersistence(),
): T =
## `participantId` is REQUIRED — it is the per-manager identity SDS-R uses
## to populate response groups and decide which incoming repair requests
## this manager is authoritative for. The Reliable Channel API spec
## (`senderId`) likewise lists it as required. An empty id silently
## disables SDS-R; callers that genuinely want plain SDS without repair
## must pass `""` explicitly.
let rm = T(
channels: initTable[SdsChannelID, ChannelContext](),
config: config,

View File

@ -10,7 +10,7 @@ suite "Persistence: write → restart → read-back":
test "outgoing buffer survives restart":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
let wrapped = rm1.wrapOutgoingMessage(@[1.byte, 2, 3], "msg-1", testChannel)
check wrapped.isOk()
@ -20,7 +20,7 @@ suite "Persistence: write → restart → read-back":
# Simulate restart: fresh manager, same backend
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
let buf = rm2.getOutgoingBuffer(testChannel)
check buf.len == 1
@ -30,21 +30,21 @@ suite "Persistence: write → restart → read-back":
test "lamport clock survives restart":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
rm1.updateLamportTimestamp(42, testChannel)
check store.lamports[testChannel] == 43 # max(42, 0) + 1
rm1.cleanup()
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
check rm2.channels[testChannel].lamportTimestamp == 43
test "delivered messages survive restart and rebuild bloom":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
let msg = SdsMessage.init(
messageId = "delivered-1",
@ -60,7 +60,7 @@ suite "Persistence: write → restart → read-back":
rm1.cleanup()
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
let ch = rm2.channels[testChannel]
check ch.messageHistory.len == 1
@ -71,7 +71,7 @@ suite "Persistence: write → restart → read-back":
test "ack removes outgoing entry from persistence":
let store = newInMemoryStore()
let p = newInMemoryPersistence(store)
let rm = newReliabilityManager(persistence = p).get()
let rm = newReliabilityManager(participantId = "alice", persistence =p).get()
check rm.ensureChannel(testChannel).isOk()
discard rm.wrapOutgoingMessage(@[1.byte], "msg-x", testChannel)
check "msg-x" in store.outgoing[testChannel]
@ -96,7 +96,7 @@ suite "Persistence: write → restart → read-back":
# drop, not N per-row removes — otherwise SQLite eats N fsyncs per drop.
let store = newInMemoryStore()
let p = newInMemoryPersistence(store)
let rm = newReliabilityManager(persistence = p).get()
let rm = newReliabilityManager(participantId = "alice", persistence =p).get()
check rm.ensureChannel(testChannel).isOk()
discard rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel)
check store.outgoing[testChannel].len == 1
@ -113,7 +113,7 @@ suite "Persistence: write → restart → read-back":
rm.cleanup()
test "noOpPersistence keeps existing manager working":
let rm = newReliabilityManager().get() # default no-op
let rm = newReliabilityManager(participantId = "alice").get() # default no-op persistence
check rm.ensureChannel(testChannel).isOk()
let wrapped = rm.wrapOutgoingMessage(@[1.byte], "msg-n", testChannel)
check wrapped.isOk()
@ -123,7 +123,7 @@ suite "Persistence: write → restart → read-back":
test "continue operating after restart: lamport stays monotonic":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
discard rm1.wrapOutgoingMessage(@[1.byte], "m1", testChannel)
let lamportAfterSession1 = store.lamports[testChannel]
@ -132,7 +132,7 @@ suite "Persistence: write → restart → read-back":
# Restart and send another message — lamport must not regress.
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
check rm2.channels[testChannel].lamportTimestamp == lamportAfterSession1
discard rm2.wrapOutgoingMessage(@[2.byte], "m2", testChannel)
@ -144,14 +144,14 @@ suite "Persistence: write → restart → read-back":
let store = newInMemoryStore()
for i in 1 .. 3:
let p = newInMemoryPersistence(store)
let rm = newReliabilityManager(persistence = p).get()
let rm = newReliabilityManager(participantId = "alice", persistence =p).get()
check rm.ensureChannel(testChannel).isOk()
discard rm.wrapOutgoingMessage(@[byte(i)], "m" & $i, testChannel)
rm.cleanup()
# Final session: all three messages must be in the buffer.
let pFinal = newInMemoryPersistence(store)
let rmFinal = newReliabilityManager(persistence = pFinal).get()
let rmFinal = newReliabilityManager(participantId = "alice", persistence =pFinal).get()
check rmFinal.ensureChannel(testChannel).isOk()
let buf = rmFinal.getOutgoingBuffer(testChannel)
check buf.len == 3
@ -166,7 +166,7 @@ suite "Persistence: write → restart → read-back":
test "incoming dep-waiting buffer survives restart with missingDeps intact":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
# Receive a message whose causal-history references an unknown predecessor.
@ -186,7 +186,7 @@ suite "Persistence: write → restart → read-back":
# Restart — buffered message and its missing-deps set must be back.
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
let inbuf = rm2.getIncomingBuffer(testChannel)
check "msg-with-deps" in inbuf
@ -198,7 +198,7 @@ suite "Persistence: write → restart → read-back":
# of the same channelId after restart picks up the old timestamp.
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
discard rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel)
check store.lamports[testChannel] > 0
@ -208,7 +208,7 @@ suite "Persistence: write → restart → read-back":
# Recreate the same channelId after a restart — must start fresh.
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
check rm2.channels[testChannel].lamportTimestamp == 0
check rm2.getOutgoingBuffer(testChannel).len == 0
@ -256,7 +256,7 @@ suite "Persistence: write → restart → read-back":
smallCfg.bloomFilterCapacity = 3
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(config = smallCfg, persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", config = smallCfg, persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
# Add 5 delivered messages — first 2 should be evicted by FIFO.
for i in 1 .. 5:
@ -277,7 +277,7 @@ suite "Persistence: write → restart → read-back":
# Restart — evicted entries must NOT come back; survivors keep order.
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(config = smallCfg, persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", config = smallCfg, persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
let history = rm2.channels[testChannel].messageHistory
check history.len == 3
@ -299,7 +299,7 @@ suite "Persistence: write → restart → read-back":
test "dep-clear cascade resumes correctly across a restart":
let store = newInMemoryStore()
let p1 = newInMemoryPersistence(store)
let rm1 = newReliabilityManager(persistence = p1).get()
let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get()
check rm1.ensureChannel(testChannel).isOk()
# Receive c (deps on b), then b (deps on a). Both must buffer.
@ -323,7 +323,7 @@ suite "Persistence: write → restart → read-back":
# Restart — both still buffered, with intact missingDeps.
let p2 = newInMemoryPersistence(store)
let rm2 = newReliabilityManager(persistence = p2).get()
let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get()
check rm2.ensureChannel(testChannel).isOk()
let inbuf = rm2.getIncomingBuffer(testChannel)
check "c" in inbuf

View File

@ -22,7 +22,7 @@ suite "Core Operations":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
@ -119,7 +119,7 @@ suite "Reliability Mechanisms":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
@ -537,7 +537,7 @@ suite "Periodic Tasks & Buffer Management":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
@ -599,7 +599,7 @@ suite "Periodic Tasks & Buffer Management":
config.bloomFilterCapacity = 2 # Small capacity for testing
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager(config)
let rmResultP = newReliabilityManager(participantId = "alice", config = config)
check rmResultP.isOk()
let rm = rmResultP.get()
check rm.ensureChannel(testChannel).isOk()
@ -675,7 +675,7 @@ suite "Special Cases Handling":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
@ -767,7 +767,7 @@ suite "Special Cases Handling":
suite "cleanup":
test "cleanup works correctly":
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
let rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
@ -789,7 +789,7 @@ suite "Multi-Channel ReliabilityManager Tests":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
@ -1371,7 +1371,9 @@ suite "SDS-R: Edge Cases and Defensive Branches":
suite "SDS-R: Lifecycle and State":
test "empty participantId disables outgoing repair creation":
let rm = newReliabilityManager().get() # empty participantId
# Explicitly pass empty id to exercise the SDS-R no-op branch. Required-arg
# signature means callers can no longer accidentally land here.
let rm = newReliabilityManager(participantId = "".SdsParticipantID).get()
defer: rm.cleanup()
check rm.ensureChannel(testChannel).isOk()
@ -1671,7 +1673,7 @@ proc addPeer(
participantId: SdsParticipantID,
config: ReliabilityConfig = defaultConfig(),
): ReliabilityManager =
let rm = newReliabilityManager(config, participantId).get()
let rm = newReliabilityManager(participantId, config).get()
doAssert rm.ensureChannel(testChannel).isOk()
bus.peers[participantId] = rm
bus.delivered[participantId] = @[]