From 2e9a7683f0e180bf112135fae3a3803eed8490d4 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Sun, 10 May 2026 13:24:42 +0530 Subject: [PATCH] fix: require participantId on newReliabilityManager (#67) --- .../requests/sds_lifecycle_request.nim | 6 ++- sds.nim | 5 ++- sds/types/reliability_manager.nim | 8 +++- tests/test_persistence.nim | 42 +++++++++---------- tests/test_reliability.nim | 20 +++++---- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim index a0f3adb..1ca6674 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -33,7 +33,11 @@ proc destroyShared(self: ptr SdsLifecycleRequest) = proc createReliabilityManager( appCallbacks: AppCallbacks = nil ): Future[Result[ReliabilityManager, string]] {.async.} = - let rm = newReliabilityManager().valueOr: + # TODO: thread `participantId` through SdsNewReliabilityManager FFI input + # and remove this hardcoded "". Empty id silently disables SDS-R; this is + # acceptable as a temporary FFI-only fallback until sds-go-bindings and + # logos-delivery's C-side caller are updated to supply the identity. + let rm = newReliabilityManager(participantId = "".SdsParticipantID).valueOr: error "Failed creating reliability manager", error = error return err("Failed creating reliability manager: " & $error) diff --git a/sds.nim b/sds.nim index e40e421..5e1ba09 100644 --- a/sds.nim +++ b/sds.nim @@ -5,15 +5,16 @@ import sds/[types, protobuf, sds_utils, rolling_bloom_filter] export types, protobuf, sds_utils, rolling_bloom_filter proc newReliabilityManager*( + participantId: SdsParticipantID, config: ReliabilityConfig = defaultConfig(), - participantId: SdsParticipantID = "".SdsParticipantID, persistence: Persistence = noOpPersistence(), ): Result[ReliabilityManager, ReliabilityError] = ## Creates a new multi-channel ReliabilityManager. + ## `participantId` is REQUIRED (see `ReliabilityManager.new`). ## `persistence` defaults to a no-op backend; supply a real one to durably ## store SDS state across restarts. try: - let rm = ReliabilityManager.new(config, participantId, persistence) + let rm = ReliabilityManager.new(participantId, config, persistence) return ok(rm) except Exception: error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg() diff --git a/sds/types/reliability_manager.nim b/sds/types/reliability_manager.nim index 2c12596..3c42850 100644 --- a/sds/types/reliability_manager.nim +++ b/sds/types/reliability_manager.nim @@ -27,10 +27,16 @@ type ReliabilityManager* = ref object proc new*( T: type ReliabilityManager, + participantId: SdsParticipantID, config: ReliabilityConfig, - participantId: SdsParticipantID = "".SdsParticipantID, persistence: Persistence = noOpPersistence(), ): T = + ## `participantId` is REQUIRED — it is the per-manager identity SDS-R uses + ## to populate response groups and decide which incoming repair requests + ## this manager is authoritative for. The Reliable Channel API spec + ## (`senderId`) likewise lists it as required. An empty id silently + ## disables SDS-R; callers that genuinely want plain SDS without repair + ## must pass `""` explicitly. let rm = T( channels: initTable[SdsChannelID, ChannelContext](), config: config, diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index c2ab4fe..4d3fc3c 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -10,7 +10,7 @@ suite "Persistence: write → restart → read-back": test "outgoing buffer survives restart": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() let wrapped = rm1.wrapOutgoingMessage(@[1.byte, 2, 3], "msg-1", testChannel) check wrapped.isOk() @@ -20,7 +20,7 @@ suite "Persistence: write → restart → read-back": # Simulate restart: fresh manager, same backend let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() let buf = rm2.getOutgoingBuffer(testChannel) check buf.len == 1 @@ -30,21 +30,21 @@ suite "Persistence: write → restart → read-back": test "lamport clock survives restart": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() rm1.updateLamportTimestamp(42, testChannel) check store.lamports[testChannel] == 43 # max(42, 0) + 1 rm1.cleanup() let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() check rm2.channels[testChannel].lamportTimestamp == 43 test "delivered messages survive restart and rebuild bloom": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() let msg = SdsMessage.init( messageId = "delivered-1", @@ -60,7 +60,7 @@ suite "Persistence: write → restart → read-back": rm1.cleanup() let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() let ch = rm2.channels[testChannel] check ch.messageHistory.len == 1 @@ -71,7 +71,7 @@ suite "Persistence: write → restart → read-back": test "ack removes outgoing entry from persistence": let store = newInMemoryStore() let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(persistence = p).get() + let rm = newReliabilityManager(participantId = "alice", persistence =p).get() check rm.ensureChannel(testChannel).isOk() discard rm.wrapOutgoingMessage(@[1.byte], "msg-x", testChannel) check "msg-x" in store.outgoing[testChannel] @@ -96,7 +96,7 @@ suite "Persistence: write → restart → read-back": # drop, not N per-row removes — otherwise SQLite eats N fsyncs per drop. let store = newInMemoryStore() let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(persistence = p).get() + let rm = newReliabilityManager(participantId = "alice", persistence =p).get() check rm.ensureChannel(testChannel).isOk() discard rm.wrapOutgoingMessage(@[1.byte], "msg-r", testChannel) check store.outgoing[testChannel].len == 1 @@ -113,7 +113,7 @@ suite "Persistence: write → restart → read-back": rm.cleanup() test "noOpPersistence keeps existing manager working": - let rm = newReliabilityManager().get() # default no-op + let rm = newReliabilityManager(participantId = "alice").get() # default no-op persistence check rm.ensureChannel(testChannel).isOk() let wrapped = rm.wrapOutgoingMessage(@[1.byte], "msg-n", testChannel) check wrapped.isOk() @@ -123,7 +123,7 @@ suite "Persistence: write → restart → read-back": test "continue operating after restart: lamport stays monotonic": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() discard rm1.wrapOutgoingMessage(@[1.byte], "m1", testChannel) let lamportAfterSession1 = store.lamports[testChannel] @@ -132,7 +132,7 @@ suite "Persistence: write → restart → read-back": # Restart and send another message — lamport must not regress. let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() check rm2.channels[testChannel].lamportTimestamp == lamportAfterSession1 discard rm2.wrapOutgoingMessage(@[2.byte], "m2", testChannel) @@ -144,14 +144,14 @@ suite "Persistence: write → restart → read-back": let store = newInMemoryStore() for i in 1 .. 3: let p = newInMemoryPersistence(store) - let rm = newReliabilityManager(persistence = p).get() + let rm = newReliabilityManager(participantId = "alice", persistence =p).get() check rm.ensureChannel(testChannel).isOk() discard rm.wrapOutgoingMessage(@[byte(i)], "m" & $i, testChannel) rm.cleanup() # Final session: all three messages must be in the buffer. let pFinal = newInMemoryPersistence(store) - let rmFinal = newReliabilityManager(persistence = pFinal).get() + let rmFinal = newReliabilityManager(participantId = "alice", persistence =pFinal).get() check rmFinal.ensureChannel(testChannel).isOk() let buf = rmFinal.getOutgoingBuffer(testChannel) check buf.len == 3 @@ -166,7 +166,7 @@ suite "Persistence: write → restart → read-back": test "incoming dep-waiting buffer survives restart with missingDeps intact": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() # Receive a message whose causal-history references an unknown predecessor. @@ -186,7 +186,7 @@ suite "Persistence: write → restart → read-back": # Restart — buffered message and its missing-deps set must be back. let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() let inbuf = rm2.getIncomingBuffer(testChannel) check "msg-with-deps" in inbuf @@ -198,7 +198,7 @@ suite "Persistence: write → restart → read-back": # of the same channelId after restart picks up the old timestamp. let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() discard rm1.wrapOutgoingMessage(@[1.byte], "m-old", testChannel) check store.lamports[testChannel] > 0 @@ -208,7 +208,7 @@ suite "Persistence: write → restart → read-back": # Recreate the same channelId after a restart — must start fresh. let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() check rm2.channels[testChannel].lamportTimestamp == 0 check rm2.getOutgoingBuffer(testChannel).len == 0 @@ -256,7 +256,7 @@ suite "Persistence: write → restart → read-back": smallCfg.bloomFilterCapacity = 3 let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(config = smallCfg, persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", config = smallCfg, persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() # Add 5 delivered messages — first 2 should be evicted by FIFO. for i in 1 .. 5: @@ -277,7 +277,7 @@ suite "Persistence: write → restart → read-back": # Restart — evicted entries must NOT come back; survivors keep order. let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(config = smallCfg, persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", config = smallCfg, persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() let history = rm2.channels[testChannel].messageHistory check history.len == 3 @@ -299,7 +299,7 @@ suite "Persistence: write → restart → read-back": test "dep-clear cascade resumes correctly across a restart": let store = newInMemoryStore() let p1 = newInMemoryPersistence(store) - let rm1 = newReliabilityManager(persistence = p1).get() + let rm1 = newReliabilityManager(participantId = "alice", persistence =p1).get() check rm1.ensureChannel(testChannel).isOk() # Receive c (deps on b), then b (deps on a). Both must buffer. @@ -323,7 +323,7 @@ suite "Persistence: write → restart → read-back": # Restart — both still buffered, with intact missingDeps. let p2 = newInMemoryPersistence(store) - let rm2 = newReliabilityManager(persistence = p2).get() + let rm2 = newReliabilityManager(participantId = "alice", persistence =p2).get() check rm2.ensureChannel(testChannel).isOk() let inbuf = rm2.getIncomingBuffer(testChannel) check "c" in inbuf diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 290ac9c..e191de7 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -22,7 +22,7 @@ suite "Core Operations": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() rm = rmResult.get() check rm.ensureChannel(testChannel).isOk() @@ -119,7 +119,7 @@ suite "Reliability Mechanisms": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() rm = rmResult.get() check rm.ensureChannel(testChannel).isOk() @@ -537,7 +537,7 @@ suite "Periodic Tasks & Buffer Management": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() rm = rmResult.get() check rm.ensureChannel(testChannel).isOk() @@ -599,7 +599,7 @@ suite "Periodic Tasks & Buffer Management": config.bloomFilterCapacity = 2 # Small capacity for testing config.maxResendAttempts = 3 # Set a low number of max attempts - let rmResultP = newReliabilityManager(config) + let rmResultP = newReliabilityManager(participantId = "alice", config = config) check rmResultP.isOk() let rm = rmResultP.get() check rm.ensureChannel(testChannel).isOk() @@ -675,7 +675,7 @@ suite "Special Cases Handling": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() rm = rmResult.get() check rm.ensureChannel(testChannel).isOk() @@ -767,7 +767,7 @@ suite "Special Cases Handling": suite "cleanup": test "cleanup works correctly": - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() let rm = rmResult.get() check rm.ensureChannel(testChannel).isOk() @@ -789,7 +789,7 @@ suite "Multi-Channel ReliabilityManager Tests": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager() + let rmResult = newReliabilityManager(participantId = "alice") check rmResult.isOk() rm = rmResult.get() @@ -1371,7 +1371,9 @@ suite "SDS-R: Edge Cases and Defensive Branches": suite "SDS-R: Lifecycle and State": test "empty participantId disables outgoing repair creation": - let rm = newReliabilityManager().get() # empty participantId + # Explicitly pass empty id to exercise the SDS-R no-op branch. Required-arg + # signature means callers can no longer accidentally land here. + let rm = newReliabilityManager(participantId = "".SdsParticipantID).get() defer: rm.cleanup() check rm.ensureChannel(testChannel).isOk() @@ -1671,7 +1673,7 @@ proc addPeer( participantId: SdsParticipantID, config: ReliabilityConfig = defaultConfig(), ): ReliabilityManager = - let rm = newReliabilityManager(config, participantId).get() + let rm = newReliabilityManager(participantId, config).get() doAssert rm.ensureChannel(testChannel).isOk() bus.peers[participantId] = rm bus.delivered[participantId] = @[]