mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-05-18 16:09:30 +00:00
feat: update according to PR.
This commit is contained in:
parent
5170c999d3
commit
5966998946
18
sds.nim
18
sds.nim
@ -6,7 +6,7 @@ export types, protobuf, sds_utils, rolling_bloom_filter
|
||||
|
||||
proc newReliabilityManager*(
|
||||
config: ReliabilityConfig = defaultConfig(),
|
||||
participantId: SdsParticipantID = "",
|
||||
participantId: SdsParticipantID = "".SdsParticipantID,
|
||||
): Result[ReliabilityManager, ReliabilityError] =
|
||||
## Creates a new multi-channel ReliabilityManager.
|
||||
try:
|
||||
@ -94,8 +94,8 @@ proc wrapOutgoingMessage*(
|
||||
let now = getTime()
|
||||
var expiredKeys: seq[SdsMessageID] = @[]
|
||||
for msgId, repairEntry in channel.outgoingRepairBuffer:
|
||||
if now >= repairEntry.tReq and repairReqs.len < rm.config.maxRepairRequests:
|
||||
repairReqs.add(repairEntry.entry)
|
||||
if now >= repairEntry.minTimeRepairReq and repairReqs.len < rm.config.maxRepairRequests:
|
||||
repairReqs.add(repairEntry.outHistEntry)
|
||||
expiredKeys.add(msgId)
|
||||
for key in expiredKeys:
|
||||
channel.outgoingRepairBuffer.del(key)
|
||||
@ -225,9 +225,9 @@ proc unwrapReceivedMessage*(
|
||||
repairEntry.messageId, rm.config.repairTMax
|
||||
)
|
||||
channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry(
|
||||
entry: repairEntry,
|
||||
inHistEntry: repairEntry,
|
||||
cachedMessage: channel.messageCache[repairEntry.messageId],
|
||||
tResp: now + tResp,
|
||||
minTimeRepairResp: now + tResp,
|
||||
)
|
||||
|
||||
var missingDeps = rm.checkDependencies(msg.causalHistory, channelId)
|
||||
@ -268,8 +268,8 @@ proc unwrapReceivedMessage*(
|
||||
rm.config.repairTMin, rm.config.repairTMax
|
||||
)
|
||||
channel.outgoingRepairBuffer[dep.messageId] = OutgoingRepairEntry(
|
||||
entry: dep,
|
||||
tReq: now + tReq,
|
||||
outHistEntry: dep,
|
||||
minTimeRepairReq: now + tReq,
|
||||
)
|
||||
|
||||
return ok((msg.content, missingDeps, channelId))
|
||||
@ -392,7 +392,7 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
|
||||
# Check incoming repair buffer for expired T_resp (time to rebroadcast)
|
||||
var toRebroadcast: seq[SdsMessageID] = @[]
|
||||
for msgId, entry in channel.incomingRepairBuffer:
|
||||
if now >= entry.tResp:
|
||||
if now >= entry.minTimeRepairResp:
|
||||
toRebroadcast.add(msgId)
|
||||
|
||||
for msgId in toRebroadcast:
|
||||
@ -405,7 +405,7 @@ proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
|
||||
var toRemove: seq[SdsMessageID] = @[]
|
||||
let tMaxDuration = rm.config.repairTMax
|
||||
for msgId, entry in channel.outgoingRepairBuffer:
|
||||
if now - entry.tReq > tMaxDuration:
|
||||
if now - entry.minTimeRepairReq > tMaxDuration:
|
||||
toRemove.add(msgId)
|
||||
for msgId in toRemove:
|
||||
channel.outgoingRepairBuffer.del(msgId)
|
||||
|
||||
@ -11,7 +11,7 @@ proc encodeHistoryEntry*(entry: HistoryEntry): ProtoBuffer =
|
||||
if entry.retrievalHint.len > 0:
|
||||
entryPb.write(2, entry.retrievalHint)
|
||||
if entry.senderId.len > 0:
|
||||
entryPb.write(3, entry.senderId)
|
||||
entryPb.write(3, entry.senderId.string)
|
||||
entryPb.finish()
|
||||
entryPb
|
||||
|
||||
@ -20,7 +20,9 @@ proc decodeHistoryEntry*(entryPb: ProtoBuffer): ProtobufResult[HistoryEntry] =
|
||||
if not ?entryPb.getField(1, entry.messageId):
|
||||
return err(ProtobufError.missingRequiredField("HistoryEntry.messageId"))
|
||||
discard entryPb.getField(2, entry.retrievalHint)
|
||||
discard entryPb.getField(3, entry.senderId)
|
||||
var senderIdStr: string
|
||||
if entryPb.getField(3, senderIdStr).valueOr(false):
|
||||
entry.senderId = senderIdStr.SdsParticipantID
|
||||
ok(entry)
|
||||
|
||||
proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
@ -38,7 +40,7 @@ proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
pb.write(6, msg.bloomFilter)
|
||||
|
||||
if msg.senderId.len > 0:
|
||||
pb.write(7, msg.senderId)
|
||||
pb.write(7, msg.senderId.string)
|
||||
|
||||
for entry in msg.repairRequest:
|
||||
let entryPb = encodeHistoryEntry(entry)
|
||||
@ -85,7 +87,9 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
msg.bloomFilter = @[] # Empty if not present
|
||||
|
||||
# SDS-R: decode senderId (field 7, optional)
|
||||
discard pb.getField(7, msg.senderId)
|
||||
var msgSenderIdStr: string
|
||||
if pb.getField(7, msgSenderIdStr).valueOr(false):
|
||||
msg.senderId = msgSenderIdStr.SdsParticipantID
|
||||
|
||||
# SDS-R: decode repair request (field 13, optional)
|
||||
var repairBuffers: seq[seq[byte]]
|
||||
|
||||
@ -84,7 +84,7 @@ proc computeTReq*(
|
||||
): Duration =
|
||||
## Computes the repair request backoff duration per SDS-R spec:
|
||||
## T_req = hash(participant_id, message_id) % (T_max - T_min) + T_min
|
||||
let h = abs(hash(participantId & messageId))
|
||||
let h = abs(hash(participantId.string & messageId))
|
||||
let rangeMs = tMax.inMilliseconds - tMin.inMilliseconds
|
||||
if rangeMs <= 0:
|
||||
return tMin
|
||||
@ -122,8 +122,8 @@ proc isInResponseGroup*(
|
||||
## hash(participant_id, message_id) % num_groups == hash(sender_id, message_id) % num_groups
|
||||
if numResponseGroups <= 1:
|
||||
return true # All participants in the same group
|
||||
let myGroup = abs(hash(participantId & messageId)) mod numResponseGroups
|
||||
let senderGroup = abs(hash(senderId & messageId)) mod numResponseGroups
|
||||
let myGroup = abs(hash(participantId.string & messageId)) mod numResponseGroups
|
||||
let senderGroup = abs(hash(senderId.string & messageId)) mod numResponseGroups
|
||||
myGroup == senderGroup
|
||||
|
||||
proc getRecentHistoryEntries*(
|
||||
|
||||
@ -1,14 +1,15 @@
|
||||
import ./sds_message_id
|
||||
export sds_message_id
|
||||
|
||||
type HistoryEntry* = object
|
||||
messageId*: SdsMessageID
|
||||
retrievalHint*: seq[byte] ## Optional hint for efficient retrieval (e.g., Waku message hash)
|
||||
senderId*: string ## Original message sender's participant ID (SDS-R)
|
||||
senderId*: SdsParticipantID ## Original message sender's participant ID (SDS-R)
|
||||
|
||||
proc init*(
|
||||
T: type HistoryEntry,
|
||||
messageId: SdsMessageID,
|
||||
retrievalHint: seq[byte] = @[],
|
||||
senderId: string = "",
|
||||
senderId: SdsParticipantID = "".SdsParticipantID,
|
||||
): T =
|
||||
return T(messageId: messageId, retrievalHint: retrievalHint, senderId: senderId)
|
||||
|
||||
@ -23,7 +23,7 @@ type ReliabilityManager* = ref object
|
||||
proc new*(
|
||||
T: type ReliabilityManager,
|
||||
config: ReliabilityConfig,
|
||||
participantId: SdsParticipantID = "",
|
||||
participantId: SdsParticipantID = "".SdsParticipantID,
|
||||
): T =
|
||||
let rm = T(
|
||||
channels: initTable[SdsChannelID, ChannelContext](),
|
||||
|
||||
@ -3,26 +3,34 @@ import ./history_entry
|
||||
export history_entry
|
||||
|
||||
type
|
||||
OutgoingRepairEntry* = object
|
||||
OutgoingRepairEntry* {.requiresInit.} = object
|
||||
## Entry in the outgoing repair request buffer (SDS-R).
|
||||
## Tracks a missing message we want to request repair for.
|
||||
entry*: HistoryEntry ## The missing history entry
|
||||
tReq*: Time ## Timestamp after which we will include this in a repair request
|
||||
outHistEntry*: HistoryEntry ## The missing history entry
|
||||
minTimeRepairReq*: Time
|
||||
## Earliest time at which we will include this in a repair request (T_REQ in spec)
|
||||
|
||||
IncomingRepairEntry* = object
|
||||
IncomingRepairEntry* {.requiresInit.} = object
|
||||
## Entry in the incoming repair request buffer (SDS-R).
|
||||
## Tracks a repair request from a remote peer that we might respond to.
|
||||
entry*: HistoryEntry ## The requested history entry
|
||||
inHistEntry*: HistoryEntry ## The requested history entry
|
||||
cachedMessage*: seq[byte] ## Full serialized SDS message for rebroadcast
|
||||
tResp*: Time ## Timestamp after which we will rebroadcast
|
||||
minTimeRepairResp*: Time
|
||||
## Earliest time at which we will rebroadcast (T_RESP in spec)
|
||||
|
||||
proc init*(T: type OutgoingRepairEntry, entry: HistoryEntry, tReq: Time): T =
|
||||
return T(entry: entry, tReq: tReq)
|
||||
proc init*(
|
||||
T: type OutgoingRepairEntry, outHistEntry: HistoryEntry, minTimeRepairReq: Time
|
||||
): T =
|
||||
return T(outHistEntry: outHistEntry, minTimeRepairReq: minTimeRepairReq)
|
||||
|
||||
proc init*(
|
||||
T: type IncomingRepairEntry,
|
||||
entry: HistoryEntry,
|
||||
inHistEntry: HistoryEntry,
|
||||
cachedMessage: seq[byte],
|
||||
tResp: Time,
|
||||
minTimeRepairResp: Time,
|
||||
): T =
|
||||
return T(entry: entry, cachedMessage: cachedMessage, tResp: tResp)
|
||||
return T(
|
||||
inHistEntry: inHistEntry,
|
||||
cachedMessage: cachedMessage,
|
||||
minTimeRepairResp: minTimeRepairResp,
|
||||
)
|
||||
|
||||
@ -21,7 +21,7 @@ proc init*(
|
||||
channelId: SdsChannelID,
|
||||
content: seq[byte],
|
||||
bloomFilter: seq[byte],
|
||||
senderId: SdsParticipantID = "",
|
||||
senderId: SdsParticipantID = "".SdsParticipantID,
|
||||
repairRequest: seq[HistoryEntry] = @[],
|
||||
): T =
|
||||
return T(
|
||||
|
||||
@ -1,4 +1,11 @@
|
||||
import std/hashes
|
||||
|
||||
type
|
||||
SdsMessageID* = string
|
||||
SdsChannelID* = string
|
||||
SdsParticipantID* = string
|
||||
SdsParticipantID* = distinct string
|
||||
|
||||
proc `==`*(a, b: SdsParticipantID): bool {.borrow.}
|
||||
proc `$`*(p: SdsParticipantID): string {.borrow.}
|
||||
proc len*(p: SdsParticipantID): int {.borrow.}
|
||||
proc hash*(p: SdsParticipantID): Hash {.borrow.}
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
import unittest, results, chronos, std/[times, options, tables]
|
||||
import sds
|
||||
|
||||
# Test-only convenience: implicit string → SdsParticipantID so test fixtures
|
||||
# can use string literals. Production code retains the distinct-type safety.
|
||||
converter toParticipantID(s: string): SdsParticipantID = s.SdsParticipantID
|
||||
|
||||
const testChannel = "testChannel"
|
||||
|
||||
# Core functionality tests
|
||||
@ -895,8 +899,8 @@ suite "SDS-R: Repair Buffer Management":
|
||||
# Manually add an expired repair entry
|
||||
let channel = rm.channels[testChannel]
|
||||
channel.outgoingRepairBuffer["missing-msg"] = OutgoingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"),
|
||||
tReq: getTime() - initDuration(seconds = 10), # Already expired
|
||||
outHistEntry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"),
|
||||
minTimeRepairReq: getTime() - initDuration(seconds = 10), # Already expired
|
||||
)
|
||||
|
||||
# Send a message — should pick up the expired repair request
|
||||
@ -1177,13 +1181,13 @@ suite "SDS-R: Lifecycle and State":
|
||||
let channel = rm.channels[testChannel]
|
||||
|
||||
channel.outgoingRepairBuffer["a"] = OutgoingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "a", senderId: "x"),
|
||||
tReq: getTime(),
|
||||
outHistEntry: HistoryEntry(messageId: "a", senderId: "x"),
|
||||
minTimeRepairReq: getTime(),
|
||||
)
|
||||
channel.incomingRepairBuffer["b"] = IncomingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "b", senderId: "y"),
|
||||
inHistEntry: HistoryEntry(messageId: "b", senderId: "y"),
|
||||
cachedMessage: @[byte(1)],
|
||||
tResp: getTime(),
|
||||
minTimeRepairResp: getTime(),
|
||||
)
|
||||
channel.messageCache["c"] = @[byte(2)]
|
||||
channel.messageSenders["c"] = "someone"
|
||||
@ -1239,9 +1243,9 @@ suite "SDS-R: Lifecycle and State":
|
||||
# Carol already has M1 in history and has a pending incomingRepairBuffer entry
|
||||
channel.messageHistory.add("m1")
|
||||
channel.incomingRepairBuffer["m1"] = IncomingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "m1", senderId: "alice"),
|
||||
inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"),
|
||||
cachedMessage: @[byte(1)],
|
||||
tResp: getTime() + initDuration(seconds = 10),
|
||||
minTimeRepairResp: getTime() + initDuration(seconds = 10),
|
||||
)
|
||||
|
||||
# A rebroadcast of M1 arrives
|
||||
@ -1284,14 +1288,14 @@ suite "SDS-R: Repair Sweep":
|
||||
|
||||
let channel = rm.channels[testChannel]
|
||||
channel.incomingRepairBuffer["m-ready"] = IncomingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "m-ready", senderId: "alice"),
|
||||
inHistEntry: HistoryEntry(messageId: "m-ready", senderId: "alice"),
|
||||
cachedMessage: @[byte(1), 2, 3],
|
||||
tResp: getTime() - initDuration(seconds = 1), # expired
|
||||
minTimeRepairResp: getTime() - initDuration(seconds = 1), # expired
|
||||
)
|
||||
channel.incomingRepairBuffer["m-not-ready"] = IncomingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"),
|
||||
inHistEntry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"),
|
||||
cachedMessage: @[byte(9), 9, 9],
|
||||
tResp: getTime() + initDuration(minutes = 10), # far future
|
||||
minTimeRepairResp: getTime() + initDuration(minutes = 10), # far future
|
||||
)
|
||||
|
||||
rm.runRepairSweep()
|
||||
@ -1312,12 +1316,12 @@ suite "SDS-R: Repair Sweep":
|
||||
let channel = rm.channels[testChannel]
|
||||
let tMax = rm.config.repairTMax
|
||||
channel.outgoingRepairBuffer["m-stale"] = OutgoingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "m-stale", senderId: "alice"),
|
||||
tReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window
|
||||
outHistEntry: HistoryEntry(messageId: "m-stale", senderId: "alice"),
|
||||
minTimeRepairReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window
|
||||
)
|
||||
channel.outgoingRepairBuffer["m-fresh"] = OutgoingRepairEntry(
|
||||
entry: HistoryEntry(messageId: "m-fresh", senderId: "alice"),
|
||||
tReq: getTime(),
|
||||
outHistEntry: HistoryEntry(messageId: "m-fresh", senderId: "alice"),
|
||||
minTimeRepairReq: getTime(),
|
||||
)
|
||||
|
||||
rm.runRepairSweep()
|
||||
@ -1411,20 +1415,20 @@ proc broadcast(
|
||||
proc forceOutgoingExpired(
|
||||
rm: ReliabilityManager, messageId: SdsMessageID
|
||||
) =
|
||||
## Push a specific outgoingRepairBuffer entry's tReq into the past so the
|
||||
## Push a specific outgoingRepairBuffer entry's minTimeRepairReq into the past so the
|
||||
## next wrapOutgoingMessage will pick it up.
|
||||
let channel = rm.channels[testChannel]
|
||||
if messageId in channel.outgoingRepairBuffer:
|
||||
channel.outgoingRepairBuffer[messageId].tReq =
|
||||
channel.outgoingRepairBuffer[messageId].minTimeRepairReq =
|
||||
getTime() - initDuration(seconds = 1)
|
||||
|
||||
proc forceIncomingExpired(
|
||||
rm: ReliabilityManager, messageId: SdsMessageID
|
||||
) =
|
||||
## Push an incomingRepairBuffer entry's tResp into the past so runRepairSweep fires it.
|
||||
## Push an incomingRepairBuffer entry's minTimeRepairResp into the past so runRepairSweep fires it.
|
||||
let channel = rm.channels[testChannel]
|
||||
if messageId in channel.incomingRepairBuffer:
|
||||
channel.incomingRepairBuffer[messageId].tResp =
|
||||
channel.incomingRepairBuffer[messageId].minTimeRepairResp =
|
||||
getTime() - initDuration(seconds = 1)
|
||||
|
||||
suite "SDS-R: Multi-Participant Integration":
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user