diff --git a/sds.nim b/sds.nim index c4a1482..f48b531 100644 --- a/sds.nim +++ b/sds.nim @@ -6,7 +6,7 @@ export types, protobuf, sds_utils, rolling_bloom_filter proc newReliabilityManager*( config: ReliabilityConfig = defaultConfig(), - participantId: SdsParticipantID = "", + participantId: SdsParticipantID = "".SdsParticipantID, ): Result[ReliabilityManager, ReliabilityError] = ## Creates a new multi-channel ReliabilityManager. try: @@ -94,8 +94,8 @@ proc wrapOutgoingMessage*( let now = getTime() var expiredKeys: seq[SdsMessageID] = @[] for msgId, repairEntry in channel.outgoingRepairBuffer: - if now >= repairEntry.tReq and repairReqs.len < rm.config.maxRepairRequests: - repairReqs.add(repairEntry.entry) + if now >= repairEntry.minTimeRepairReq and repairReqs.len < rm.config.maxRepairRequests: + repairReqs.add(repairEntry.outHistEntry) expiredKeys.add(msgId) for key in expiredKeys: channel.outgoingRepairBuffer.del(key) @@ -225,9 +225,9 @@ proc unwrapReceivedMessage*( repairEntry.messageId, rm.config.repairTMax ) channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry( - entry: repairEntry, + inHistEntry: repairEntry, cachedMessage: channel.messageCache[repairEntry.messageId], - tResp: now + tResp, + minTimeRepairResp: now + tResp, ) var missingDeps = rm.checkDependencies(msg.causalHistory, channelId) @@ -268,8 +268,8 @@ proc unwrapReceivedMessage*( rm.config.repairTMin, rm.config.repairTMax ) channel.outgoingRepairBuffer[dep.messageId] = OutgoingRepairEntry( - entry: dep, - tReq: now + tReq, + outHistEntry: dep, + minTimeRepairReq: now + tReq, ) return ok((msg.content, missingDeps, channelId)) @@ -392,7 +392,7 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} = # Check incoming repair buffer for expired T_resp (time to rebroadcast) var toRebroadcast: seq[SdsMessageID] = @[] for msgId, entry in channel.incomingRepairBuffer: - if now >= entry.tResp: + if now >= entry.minTimeRepairResp: toRebroadcast.add(msgId) for msgId in toRebroadcast: @@ -405,7 +405,7 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} = var toRemove: seq[SdsMessageID] = @[] let tMaxDuration = rm.config.repairTMax for msgId, entry in channel.outgoingRepairBuffer: - if now - entry.tReq > tMaxDuration: + if now - entry.minTimeRepairReq > tMaxDuration: toRemove.add(msgId) for msgId in toRemove: channel.outgoingRepairBuffer.del(msgId) diff --git a/sds/protobuf.nim b/sds/protobuf.nim index 24a95d7..916bf18 100644 --- a/sds/protobuf.nim +++ b/sds/protobuf.nim @@ -11,7 +11,7 @@ proc encodeHistoryEntry*(entry: HistoryEntry): ProtoBuffer = if entry.retrievalHint.len > 0: entryPb.write(2, entry.retrievalHint) if entry.senderId.len > 0: - entryPb.write(3, entry.senderId) + entryPb.write(3, entry.senderId.string) entryPb.finish() entryPb @@ -20,7 +20,9 @@ proc decodeHistoryEntry*(entryPb: ProtoBuffer): ProtobufResult[HistoryEntry] = if not ?entryPb.getField(1, entry.messageId): return err(ProtobufError.missingRequiredField("HistoryEntry.messageId")) discard entryPb.getField(2, entry.retrievalHint) - discard entryPb.getField(3, entry.senderId) + var senderIdStr: string + if entryPb.getField(3, senderIdStr).valueOr(false): + entry.senderId = senderIdStr.SdsParticipantID ok(entry) proc encode*(msg: SdsMessage): ProtoBuffer = @@ -38,7 +40,7 @@ proc encode*(msg: SdsMessage): ProtoBuffer = pb.write(6, msg.bloomFilter) if msg.senderId.len > 0: - pb.write(7, msg.senderId) + pb.write(7, msg.senderId.string) for entry in msg.repairRequest: let entryPb = encodeHistoryEntry(entry) @@ -85,7 +87,9 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] = msg.bloomFilter = @[] # Empty if not present # SDS-R: decode senderId (field 7, optional) - discard pb.getField(7, msg.senderId) + var msgSenderIdStr: string + if pb.getField(7, msgSenderIdStr).valueOr(false): + msg.senderId = msgSenderIdStr.SdsParticipantID # SDS-R: decode repair request (field 13, optional) var repairBuffers: seq[seq[byte]] diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 7e4eba6..2ff9ad4 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -84,7 +84,7 @@ proc computeTReq*( ): Duration = ## Computes the repair request backoff duration per SDS-R spec: ## T_req = hash(participant_id, message_id) % (T_max - T_min) + T_min - let h = abs(hash(participantId & messageId)) + let h = abs(hash(participantId.string & messageId)) let rangeMs = tMax.inMilliseconds - tMin.inMilliseconds if rangeMs <= 0: return tMin @@ -122,8 +122,8 @@ proc isInResponseGroup*( ## hash(participant_id, message_id) % num_groups == hash(sender_id, message_id) % num_groups if numResponseGroups <= 1: return true # All participants in the same group - let myGroup = abs(hash(participantId & messageId)) mod numResponseGroups - let senderGroup = abs(hash(senderId & messageId)) mod numResponseGroups + let myGroup = abs(hash(participantId.string & messageId)) mod numResponseGroups + let senderGroup = abs(hash(senderId.string & messageId)) mod numResponseGroups myGroup == senderGroup proc getRecentHistoryEntries*( diff --git a/sds/types/history_entry.nim b/sds/types/history_entry.nim index b55fc20..d06afac 100644 --- a/sds/types/history_entry.nim +++ b/sds/types/history_entry.nim @@ -1,14 +1,15 @@ import ./sds_message_id +export sds_message_id type HistoryEntry* = object messageId*: SdsMessageID retrievalHint*: seq[byte] ## Optional hint for efficient retrieval (e.g., Waku message hash) - senderId*: string ## Original message sender's participant ID (SDS-R) + senderId*: SdsParticipantID ## Original message sender's participant ID (SDS-R) proc init*( T: type HistoryEntry, messageId: SdsMessageID, retrievalHint: seq[byte] = @[], - senderId: string = "", + senderId: SdsParticipantID = "".SdsParticipantID, ): T = return T(messageId: messageId, retrievalHint: retrievalHint, senderId: senderId) diff --git a/sds/types/reliability_manager.nim b/sds/types/reliability_manager.nim index 5545859..d28ee5d 100644 --- a/sds/types/reliability_manager.nim +++ b/sds/types/reliability_manager.nim @@ -23,7 +23,7 @@ type ReliabilityManager* = ref object proc new*( T: type ReliabilityManager, config: ReliabilityConfig, - participantId: SdsParticipantID = "", + participantId: SdsParticipantID = "".SdsParticipantID, ): T = let rm = T( channels: initTable[SdsChannelID, ChannelContext](), diff --git a/sds/types/repair_entry.nim b/sds/types/repair_entry.nim index 01f0fd5..04503b9 100644 --- a/sds/types/repair_entry.nim +++ b/sds/types/repair_entry.nim @@ -3,26 +3,34 @@ import ./history_entry export history_entry type - OutgoingRepairEntry* = object + OutgoingRepairEntry* {.requiresInit.} = object ## Entry in the outgoing repair request buffer (SDS-R). ## Tracks a missing message we want to request repair for. - entry*: HistoryEntry ## The missing history entry - tReq*: Time ## Timestamp after which we will include this in a repair request + outHistEntry*: HistoryEntry ## The missing history entry + minTimeRepairReq*: Time + ## Earliest time at which we will include this in a repair request (T_REQ in spec) - IncomingRepairEntry* = object + IncomingRepairEntry* {.requiresInit.} = object ## Entry in the incoming repair request buffer (SDS-R). ## Tracks a repair request from a remote peer that we might respond to. - entry*: HistoryEntry ## The requested history entry + inHistEntry*: HistoryEntry ## The requested history entry cachedMessage*: seq[byte] ## Full serialized SDS message for rebroadcast - tResp*: Time ## Timestamp after which we will rebroadcast + minTimeRepairResp*: Time + ## Earliest time at which we will rebroadcast (T_RESP in spec) -proc init*(T: type OutgoingRepairEntry, entry: HistoryEntry, tReq: Time): T = - return T(entry: entry, tReq: tReq) +proc init*( + T: type OutgoingRepairEntry, outHistEntry: HistoryEntry, minTimeRepairReq: Time +): T = + return T(outHistEntry: outHistEntry, minTimeRepairReq: minTimeRepairReq) proc init*( T: type IncomingRepairEntry, - entry: HistoryEntry, + inHistEntry: HistoryEntry, cachedMessage: seq[byte], - tResp: Time, + minTimeRepairResp: Time, ): T = - return T(entry: entry, cachedMessage: cachedMessage, tResp: tResp) + return T( + inHistEntry: inHistEntry, + cachedMessage: cachedMessage, + minTimeRepairResp: minTimeRepairResp, + ) diff --git a/sds/types/sds_message.nim b/sds/types/sds_message.nim index 6ab7a4f..d509124 100644 --- a/sds/types/sds_message.nim +++ b/sds/types/sds_message.nim @@ -21,7 +21,7 @@ proc init*( channelId: SdsChannelID, content: seq[byte], bloomFilter: seq[byte], - senderId: SdsParticipantID = "", + senderId: SdsParticipantID = "".SdsParticipantID, repairRequest: seq[HistoryEntry] = @[], ): T = return T( diff --git a/sds/types/sds_message_id.nim b/sds/types/sds_message_id.nim index dfeb025..05f1ab4 100644 --- a/sds/types/sds_message_id.nim +++ b/sds/types/sds_message_id.nim @@ -1,4 +1,11 @@ +import std/hashes + type SdsMessageID* = string SdsChannelID* = string - SdsParticipantID* = string + SdsParticipantID* = distinct string + +proc `==`*(a, b: SdsParticipantID): bool {.borrow.} +proc `$`*(p: SdsParticipantID): string {.borrow.} +proc len*(p: SdsParticipantID): int {.borrow.} +proc hash*(p: SdsParticipantID): Hash {.borrow.} diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 7f738c5..ec0d40f 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1,6 +1,10 @@ import unittest, results, chronos, std/[times, options, tables] import sds +# Test-only convenience: implicit string → SdsParticipantID so test fixtures +# can use string literals. Production code retains the distinct-type safety. +converter toParticipantID(s: string): SdsParticipantID = s.SdsParticipantID + const testChannel = "testChannel" # Core functionality tests @@ -895,8 +899,8 @@ suite "SDS-R: Repair Buffer Management": # Manually add an expired repair entry let channel = rm.channels[testChannel] channel.outgoingRepairBuffer["missing-msg"] = OutgoingRepairEntry( - entry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"), - tReq: getTime() - initDuration(seconds = 10), # Already expired + outHistEntry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"), + minTimeRepairReq: getTime() - initDuration(seconds = 10), # Already expired ) # Send a message — should pick up the expired repair request @@ -1177,13 +1181,13 @@ suite "SDS-R: Lifecycle and State": let channel = rm.channels[testChannel] channel.outgoingRepairBuffer["a"] = OutgoingRepairEntry( - entry: HistoryEntry(messageId: "a", senderId: "x"), - tReq: getTime(), + outHistEntry: HistoryEntry(messageId: "a", senderId: "x"), + minTimeRepairReq: getTime(), ) channel.incomingRepairBuffer["b"] = IncomingRepairEntry( - entry: HistoryEntry(messageId: "b", senderId: "y"), + inHistEntry: HistoryEntry(messageId: "b", senderId: "y"), cachedMessage: @[byte(1)], - tResp: getTime(), + minTimeRepairResp: getTime(), ) channel.messageCache["c"] = @[byte(2)] channel.messageSenders["c"] = "someone" @@ -1239,9 +1243,9 @@ suite "SDS-R: Lifecycle and State": # Carol already has M1 in history and has a pending incomingRepairBuffer entry channel.messageHistory.add("m1") channel.incomingRepairBuffer["m1"] = IncomingRepairEntry( - entry: HistoryEntry(messageId: "m1", senderId: "alice"), + inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"), cachedMessage: @[byte(1)], - tResp: getTime() + initDuration(seconds = 10), + minTimeRepairResp: getTime() + initDuration(seconds = 10), ) # A rebroadcast of M1 arrives @@ -1284,14 +1288,14 @@ suite "SDS-R: Repair Sweep": let channel = rm.channels[testChannel] channel.incomingRepairBuffer["m-ready"] = IncomingRepairEntry( - entry: HistoryEntry(messageId: "m-ready", senderId: "alice"), + inHistEntry: HistoryEntry(messageId: "m-ready", senderId: "alice"), cachedMessage: @[byte(1), 2, 3], - tResp: getTime() - initDuration(seconds = 1), # expired + minTimeRepairResp: getTime() - initDuration(seconds = 1), # expired ) channel.incomingRepairBuffer["m-not-ready"] = IncomingRepairEntry( - entry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"), + inHistEntry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"), cachedMessage: @[byte(9), 9, 9], - tResp: getTime() + initDuration(minutes = 10), # far future + minTimeRepairResp: getTime() + initDuration(minutes = 10), # far future ) rm.runRepairSweep() @@ -1312,12 +1316,12 @@ suite "SDS-R: Repair Sweep": let channel = rm.channels[testChannel] let tMax = rm.config.repairTMax channel.outgoingRepairBuffer["m-stale"] = OutgoingRepairEntry( - entry: HistoryEntry(messageId: "m-stale", senderId: "alice"), - tReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window + outHistEntry: HistoryEntry(messageId: "m-stale", senderId: "alice"), + minTimeRepairReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window ) channel.outgoingRepairBuffer["m-fresh"] = OutgoingRepairEntry( - entry: HistoryEntry(messageId: "m-fresh", senderId: "alice"), - tReq: getTime(), + outHistEntry: HistoryEntry(messageId: "m-fresh", senderId: "alice"), + minTimeRepairReq: getTime(), ) rm.runRepairSweep() @@ -1411,20 +1415,20 @@ proc broadcast( proc forceOutgoingExpired( rm: ReliabilityManager, messageId: SdsMessageID ) = - ## Push a specific outgoingRepairBuffer entry's tReq into the past so the + ## Push a specific outgoingRepairBuffer entry's minTimeRepairReq into the past so the ## next wrapOutgoingMessage will pick it up. let channel = rm.channels[testChannel] if messageId in channel.outgoingRepairBuffer: - channel.outgoingRepairBuffer[messageId].tReq = + channel.outgoingRepairBuffer[messageId].minTimeRepairReq = getTime() - initDuration(seconds = 1) proc forceIncomingExpired( rm: ReliabilityManager, messageId: SdsMessageID ) = - ## Push an incomingRepairBuffer entry's tResp into the past so runRepairSweep fires it. + ## Push an incomingRepairBuffer entry's minTimeRepairResp into the past so runRepairSweep fires it. let channel = rm.channels[testChannel] if messageId in channel.incomingRepairBuffer: - channel.incomingRepairBuffer[messageId].tResp = + channel.incomingRepairBuffer[messageId].minTimeRepairResp = getTime() - initDuration(seconds = 1) suite "SDS-R: Multi-Participant Integration":