mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-06-19 23:49:57 +00:00
feat: Implementation of SDS-Repair (#60)
This commit is contained in:
parent
8ee857c908
commit
9d08f5995b
20
library/events/json_repair_ready_event.nim
Normal file
20
library/events/json_repair_ready_event.nim
Normal file
@ -0,0 +1,20 @@
|
||||
import std/[json, base64]
|
||||
import ./json_base_event, sds/[message]
|
||||
|
||||
type JsonRepairReadyEvent* = ref object of JsonEvent
|
||||
channelId*: SdsChannelID
|
||||
message*: seq[byte]
|
||||
|
||||
proc new*(
|
||||
T: type JsonRepairReadyEvent, message: seq[byte], channelId: SdsChannelID
|
||||
): T =
|
||||
return JsonRepairReadyEvent(
|
||||
eventType: "repair_ready", message: message, channelId: channelId
|
||||
)
|
||||
|
||||
method `$`*(jsonRepairReady: JsonRepairReadyEvent): string =
|
||||
var node = newJObject()
|
||||
node["eventType"] = %*jsonRepairReady.eventType
|
||||
node["channelId"] = %*jsonRepairReady.channelId
|
||||
node["message"] = %*encode(jsonRepairReady.message)
|
||||
$node
|
||||
@ -16,7 +16,7 @@ import
|
||||
sds,
|
||||
./events/[
|
||||
json_message_ready_event, json_message_sent_event, json_missing_dependencies_event,
|
||||
json_periodic_sync_event,
|
||||
json_periodic_sync_event, json_repair_ready_event,
|
||||
]
|
||||
|
||||
################################################################################
|
||||
@ -114,6 +114,11 @@ proc onPeriodicSync(ctx: ptr SdsContext): PeriodicSyncCallback =
|
||||
callEventCallback(ctx, "onPeriodicSync"):
|
||||
$JsonPeriodicSyncEvent.new()
|
||||
|
||||
proc onRepairReady(ctx: ptr SdsContext): RepairReadyCallback =
|
||||
return proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
|
||||
callEventCallback(ctx, "onRepairReady"):
|
||||
$JsonRepairReadyEvent.new(message, channelId)
|
||||
|
||||
proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider =
|
||||
return proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} =
|
||||
if isNil(ctx.retrievalHintProvider):
|
||||
@ -196,6 +201,7 @@ proc SdsNewReliabilityManager(
|
||||
missingDependenciesCb: onMissingDependencies(ctx),
|
||||
periodicSyncCb: onPeriodicSync(ctx),
|
||||
retrievalHintProvider: onRetrievalHint(ctx),
|
||||
repairReadyCb: onRepairReady(ctx),
|
||||
)
|
||||
|
||||
let retCode = handleRequest(
|
||||
|
||||
@ -40,7 +40,7 @@ proc createReliabilityManager(
|
||||
rm.setCallbacks(
|
||||
appCallbacks.messageReadyCb, appCallbacks.messageSentCb,
|
||||
appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb,
|
||||
appCallbacks.retrievalHintProvider,
|
||||
appCallbacks.retrievalHintProvider, appCallbacks.repairReadyCb,
|
||||
)
|
||||
|
||||
return ok(rm)
|
||||
|
||||
143
sds.nim
143
sds.nim
@ -1,15 +1,16 @@
|
||||
import std/[times, locks, tables, sets, options]
|
||||
import std/[algorithm, times, locks, tables, sets, options]
|
||||
import chronos, results, chronicles
|
||||
import sds/[types, protobuf, sds_utils, rolling_bloom_filter]
|
||||
|
||||
export types, protobuf, sds_utils, rolling_bloom_filter
|
||||
|
||||
proc newReliabilityManager*(
|
||||
config: ReliabilityConfig = defaultConfig()
|
||||
config: ReliabilityConfig = defaultConfig(),
|
||||
participantId: SdsParticipantID = "".SdsParticipantID,
|
||||
): Result[ReliabilityManager, ReliabilityError] =
|
||||
## Creates a new multi-channel ReliabilityManager.
|
||||
try:
|
||||
let rm = ReliabilityManager.new(config)
|
||||
let rm = ReliabilityManager.new(config, participantId)
|
||||
return ok(rm)
|
||||
except Exception:
|
||||
error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg()
|
||||
@ -88,6 +89,26 @@ proc wrapOutgoingMessage*(
|
||||
error "Failed to serialize bloom filter", channelId = channelId
|
||||
return err(ReliabilityError.reSerializationError)
|
||||
|
||||
# SDS-R: collect eligible expired repair requests to attach. Per
|
||||
# spec (sds-r-send-message, RECOMMENDED), prioritise the entries with
|
||||
# the smallest minTimeRepairReq — they are the most overdue and the
|
||||
# ones the network most needs us to ask about.
|
||||
var repairReqs: seq[HistoryEntry] = @[]
|
||||
let now = getTime()
|
||||
var expiredKeys: seq[SdsMessageID] = @[]
|
||||
var eligible: seq[(SdsMessageID, OutgoingRepairEntry)] = @[]
|
||||
for msgId, repairEntry in channel.outgoingRepairBuffer:
|
||||
if now >= repairEntry.minTimeRepairReq:
|
||||
eligible.add((msgId, repairEntry))
|
||||
eligible.sort do(a, b: (SdsMessageID, OutgoingRepairEntry)) -> int:
|
||||
cmp(a[1].minTimeRepairReq, b[1].minTimeRepairReq)
|
||||
let take = min(eligible.len, rm.config.maxRepairRequests)
|
||||
for i in 0 ..< take:
|
||||
repairReqs.add(eligible[i][1].outHistEntry)
|
||||
expiredKeys.add(eligible[i][0])
|
||||
for key in expiredKeys:
|
||||
channel.outgoingRepairBuffer.del(key)
|
||||
|
||||
let msg = SdsMessage.init(
|
||||
messageId = messageId,
|
||||
lamportTimestamp = channel.lamportTimestamp,
|
||||
@ -95,6 +116,8 @@ proc wrapOutgoingMessage*(
|
||||
channelId = channelId,
|
||||
content = message,
|
||||
bloomFilter = bfResult.get(),
|
||||
senderId = rm.participantId,
|
||||
repairRequest = repairReqs,
|
||||
)
|
||||
|
||||
channel.outgoingBuffer.add(
|
||||
@ -102,7 +125,10 @@ proc wrapOutgoingMessage*(
|
||||
)
|
||||
|
||||
channel.bloomFilter.add(msg.messageId)
|
||||
rm.addToHistory(msg.messageId, channelId)
|
||||
# The full SdsMessage carries senderId and content, so a single
|
||||
# addToHistory replaces the old triple-write to messageHistory,
|
||||
# messageCache, and messageSenders.
|
||||
rm.addToHistory(msg, channelId)
|
||||
|
||||
return serializeMessage(msg)
|
||||
except Exception:
|
||||
@ -133,7 +159,7 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc
|
||||
continue
|
||||
|
||||
if msgId in channel.incomingBuffer:
|
||||
rm.addToHistory(msgId, channelId)
|
||||
rm.addToHistory(channel.incomingBuffer[msgId].message, channelId)
|
||||
if not rm.onMessageReady.isNil():
|
||||
rm.onMessageReady(msgId, channelId)
|
||||
processed.incl(msgId)
|
||||
@ -164,6 +190,11 @@ proc unwrapReceivedMessage*(
|
||||
|
||||
let channel = rm.getOrCreateChannel(channelId)
|
||||
|
||||
# SDS-R: opportunistic repair-buffer cleanup — applies to duplicates too,
|
||||
# so rebroadcasts cancel redundant responses on peers that already have the message.
|
||||
channel.outgoingRepairBuffer.del(msg.messageId)
|
||||
channel.incomingRepairBuffer.del(msg.messageId)
|
||||
|
||||
if msg.messageId in channel.messageHistory:
|
||||
return ok((msg.content, @[], channelId))
|
||||
|
||||
@ -172,6 +203,32 @@ proc unwrapReceivedMessage*(
|
||||
rm.updateLamportTimestamp(msg.lamportTimestamp, channelId)
|
||||
rm.reviewAckStatus(msg)
|
||||
|
||||
# SDS-R: process incoming repair requests from this message. We can only
|
||||
# answer for messages we have actually delivered (i.e. that live in
|
||||
# messageHistory) — buffered-but-undelivered messages are not in a state
|
||||
# to confidently rebroadcast.
|
||||
let now = getTime()
|
||||
for repairEntry in msg.repairRequest:
|
||||
# Remove from our own outgoing repair buffer (someone else is also requesting)
|
||||
channel.outgoingRepairBuffer.del(repairEntry.messageId)
|
||||
if repairEntry.messageId in channel.messageHistory and
|
||||
rm.participantId.len > 0 and repairEntry.senderId.len > 0:
|
||||
if isInResponseGroup(
|
||||
rm.participantId, repairEntry.senderId,
|
||||
repairEntry.messageId, rm.config.numResponseGroups
|
||||
):
|
||||
let serialized = serializeMessage(channel.messageHistory[repairEntry.messageId])
|
||||
if serialized.isOk():
|
||||
let tResp = computeTResp(
|
||||
rm.participantId, repairEntry.senderId,
|
||||
repairEntry.messageId, rm.config.repairTMax
|
||||
)
|
||||
channel.incomingRepairBuffer[repairEntry.messageId] = IncomingRepairEntry(
|
||||
inHistEntry: repairEntry,
|
||||
cachedMessage: serialized.get(),
|
||||
minTimeRepairResp: now + tResp,
|
||||
)
|
||||
|
||||
var missingDeps = rm.checkDependencies(msg.causalHistory, channelId)
|
||||
|
||||
if missingDeps.len == 0:
|
||||
@ -184,7 +241,11 @@ proc unwrapReceivedMessage*(
|
||||
channel.incomingBuffer[msg.messageId] =
|
||||
IncomingMessage.init(message = msg, missingDeps = initHashSet[SdsMessageID]())
|
||||
else:
|
||||
rm.addToHistory(msg.messageId, channelId)
|
||||
rm.addToHistory(msg, channelId)
|
||||
# Unblock any buffered messages that were waiting on this one.
|
||||
for pendingId, entry in channel.incomingBuffer:
|
||||
if msg.messageId in entry.missingDeps:
|
||||
channel.incomingBuffer[pendingId].missingDeps.excl(msg.messageId)
|
||||
rm.processIncomingBuffer(channelId)
|
||||
if not rm.onMessageReady.isNil():
|
||||
rm.onMessageReady(msg.messageId, channelId)
|
||||
@ -197,6 +258,19 @@ proc unwrapReceivedMessage*(
|
||||
if not rm.onMissingDependencies.isNil():
|
||||
rm.onMissingDependencies(msg.messageId, missingDeps, channelId)
|
||||
|
||||
# SDS-R: add missing deps to outgoing repair buffer
|
||||
if rm.participantId.len > 0:
|
||||
for dep in missingDeps:
|
||||
if dep.messageId notin channel.outgoingRepairBuffer:
|
||||
let tReq = computeTReq(
|
||||
rm.participantId, dep.messageId,
|
||||
rm.config.repairTMin, rm.config.repairTMax
|
||||
)
|
||||
channel.outgoingRepairBuffer[dep.messageId] = OutgoingRepairEntry(
|
||||
outHistEntry: dep,
|
||||
minTimeRepairReq: now + tReq,
|
||||
)
|
||||
|
||||
return ok((msg.content, missingDeps, channelId))
|
||||
except Exception:
|
||||
error "Failed to unwrap message", msg = getCurrentExceptionMsg()
|
||||
@ -220,6 +294,10 @@ proc markDependenciesMet*(
|
||||
if msgId in entry.missingDeps:
|
||||
channel.incomingBuffer[pendingId].missingDeps.excl(msgId)
|
||||
|
||||
# SDS-R: clear from repair buffers (dependency now met)
|
||||
channel.outgoingRepairBuffer.del(msgId)
|
||||
channel.incomingRepairBuffer.del(msgId)
|
||||
|
||||
rm.processIncomingBuffer(channelId)
|
||||
return ok()
|
||||
except Exception:
|
||||
@ -234,6 +312,7 @@ proc setCallbacks*(
|
||||
onMissingDependencies: MissingDependenciesCallback,
|
||||
onPeriodicSync: PeriodicSyncCallback = nil,
|
||||
onRetrievalHint: RetrievalHintProvider = nil,
|
||||
onRepairReady: RepairReadyCallback = nil,
|
||||
) =
|
||||
## Sets the callback functions for various events in the ReliabilityManager.
|
||||
withLock rm.lock:
|
||||
@ -242,6 +321,7 @@ proc setCallbacks*(
|
||||
rm.onMissingDependencies = onMissingDependencies
|
||||
rm.onPeriodicSync = onPeriodicSync
|
||||
rm.onRetrievalHint = onRetrievalHint
|
||||
rm.onRepairReady = onRepairReady
|
||||
|
||||
proc checkUnacknowledgedMessages(
|
||||
rm: ReliabilityManager, channelId: SdsChannelID
|
||||
@ -299,10 +379,57 @@ proc periodicSyncMessage(
|
||||
error "Error in periodic sync", msg = getCurrentExceptionMsg()
|
||||
await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds))
|
||||
|
||||
proc runRepairSweep*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
|
||||
## SDS-R: Runs a single pass of the repair sweep.
|
||||
## - Incoming: fires onRepairReady for expired T_resp entries and removes them
|
||||
## - Outgoing: drops entries past T_max window
|
||||
## Exposed so it can be driven directly in tests; also invoked by periodicRepairSweep.
|
||||
## Acquires rm.lock so the repair buffers cannot be observed mid-mutation by
|
||||
## a concurrent wrapOutgoingMessage / unwrapReceivedMessage on another thread.
|
||||
withLock rm.lock:
|
||||
try:
|
||||
let now = getTime()
|
||||
for channelId, channel in rm.channels:
|
||||
try:
|
||||
# Check incoming repair buffer for expired T_resp (time to rebroadcast)
|
||||
var toRebroadcast: seq[SdsMessageID] = @[]
|
||||
for msgId, entry in channel.incomingRepairBuffer:
|
||||
if now >= entry.minTimeRepairResp:
|
||||
toRebroadcast.add(msgId)
|
||||
|
||||
for msgId in toRebroadcast:
|
||||
let entry = channel.incomingRepairBuffer[msgId]
|
||||
channel.incomingRepairBuffer.del(msgId)
|
||||
if not rm.onRepairReady.isNil():
|
||||
rm.onRepairReady(entry.cachedMessage, channelId)
|
||||
|
||||
# Drop expired outgoing repair entries past T_max
|
||||
var toRemove: seq[SdsMessageID] = @[]
|
||||
let tMaxDuration = rm.config.repairTMax
|
||||
for msgId, entry in channel.outgoingRepairBuffer:
|
||||
if now - entry.minTimeRepairReq > tMaxDuration:
|
||||
toRemove.add(msgId)
|
||||
for msgId in toRemove:
|
||||
channel.outgoingRepairBuffer.del(msgId)
|
||||
except Exception:
|
||||
error "Error in repair sweep for channel",
|
||||
channelId = channelId, msg = getCurrentExceptionMsg()
|
||||
except Exception:
|
||||
error "Error in repair sweep", msg = getCurrentExceptionMsg()
|
||||
|
||||
proc periodicRepairSweep(
|
||||
rm: ReliabilityManager
|
||||
) {.async: (raises: [CancelledError]), gcsafe.} =
|
||||
## SDS-R: Periodically checks repair buffers for expired entries.
|
||||
while true:
|
||||
rm.runRepairSweep()
|
||||
await sleepAsync(chronos.milliseconds(rm.config.repairSweepInterval.inMilliseconds))
|
||||
|
||||
proc startPeriodicTasks*(rm: ReliabilityManager) =
|
||||
## Starts the periodic tasks for buffer sweeping and sync message sending.
|
||||
asyncSpawn rm.periodicBufferSweep()
|
||||
asyncSpawn rm.periodicSyncMessage()
|
||||
asyncSpawn rm.periodicRepairSweep()
|
||||
|
||||
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] =
|
||||
## Resets the ReliabilityManager to its initial state.
|
||||
@ -310,9 +437,11 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE
|
||||
try:
|
||||
for channelId, channel in rm.channels:
|
||||
channel.lamportTimestamp = 0
|
||||
channel.messageHistory.setLen(0)
|
||||
channel.messageHistory.clear()
|
||||
channel.outgoingBuffer.setLen(0)
|
||||
channel.incomingBuffer.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
channel.bloomFilter =
|
||||
RollingBloomFilter.init(rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate)
|
||||
rm.channels.clear()
|
||||
|
||||
@ -3,6 +3,7 @@ import ./types/history_entry
|
||||
import ./types/sds_message
|
||||
import ./types/unacknowledged_message
|
||||
import ./types/incoming_message
|
||||
import ./types/repair_entry
|
||||
import ./types/reliability_config
|
||||
|
||||
export
|
||||
@ -11,4 +12,5 @@ export
|
||||
sds_message,
|
||||
unacknowledged_message,
|
||||
incoming_message,
|
||||
repair_entry,
|
||||
reliability_config
|
||||
|
||||
@ -5,6 +5,26 @@ import ./protobufutil
|
||||
import ./bloom
|
||||
import ./sds_utils
|
||||
|
||||
proc encodeHistoryEntry*(entry: HistoryEntry): ProtoBuffer =
|
||||
var entryPb = initProtoBuffer()
|
||||
entryPb.write(1, entry.messageId)
|
||||
if entry.retrievalHint.len > 0:
|
||||
entryPb.write(2, entry.retrievalHint)
|
||||
if entry.senderId.len > 0:
|
||||
entryPb.write(3, entry.senderId.string)
|
||||
entryPb.finish()
|
||||
entryPb
|
||||
|
||||
proc decodeHistoryEntry*(entryPb: ProtoBuffer): ProtobufResult[HistoryEntry] =
|
||||
var entry = HistoryEntry.init("")
|
||||
if not ?entryPb.getField(1, entry.messageId):
|
||||
return err(ProtobufError.missingRequiredField("HistoryEntry.messageId"))
|
||||
discard entryPb.getField(2, entry.retrievalHint)
|
||||
var senderIdStr: string
|
||||
if entryPb.getField(3, senderIdStr).valueOr(false):
|
||||
entry.senderId = senderIdStr.SdsParticipantID
|
||||
ok(entry)
|
||||
|
||||
proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
var pb = initProtoBuffer()
|
||||
|
||||
@ -12,16 +32,20 @@ proc encode*(msg: SdsMessage): ProtoBuffer =
|
||||
pb.write(2, uint64(msg.lamportTimestamp))
|
||||
|
||||
for entry in msg.causalHistory:
|
||||
var entryPb = initProtoBuffer()
|
||||
entryPb.write(1, entry.messageId)
|
||||
if entry.retrievalHint.len > 0:
|
||||
entryPb.write(2, entry.retrievalHint)
|
||||
entryPb.finish()
|
||||
let entryPb = encodeHistoryEntry(entry)
|
||||
pb.write(3, entryPb.buffer)
|
||||
|
||||
pb.write(4, msg.channelId)
|
||||
pb.write(5, msg.content)
|
||||
pb.write(6, msg.bloomFilter)
|
||||
|
||||
if msg.senderId.len > 0:
|
||||
pb.write(7, msg.senderId.string)
|
||||
|
||||
for entry in msg.repairRequest:
|
||||
let entryPb = encodeHistoryEntry(entry)
|
||||
pb.write(13, entryPb.buffer)
|
||||
|
||||
pb.finish()
|
||||
|
||||
return pb
|
||||
@ -44,11 +68,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
# New format: repeated HistoryEntry
|
||||
for histBuffer in historyBuffers:
|
||||
let entryPb = initProtoBuffer(histBuffer)
|
||||
var entry = HistoryEntry.init("")
|
||||
if not ?entryPb.getField(1, entry.messageId):
|
||||
return err(ProtobufError.missingRequiredField("HistoryEntry.messageId"))
|
||||
# retrievalHint is optional
|
||||
discard entryPb.getField(2, entry.retrievalHint)
|
||||
let entry = ?decodeHistoryEntry(entryPb)
|
||||
msg.causalHistory.add(entry)
|
||||
else:
|
||||
# Try old format: repeated string
|
||||
@ -66,6 +86,19 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
if not ?pb.getField(6, msg.bloomFilter):
|
||||
msg.bloomFilter = @[] # Empty if not present
|
||||
|
||||
# SDS-R: decode senderId (field 7, optional)
|
||||
var msgSenderIdStr: string
|
||||
if pb.getField(7, msgSenderIdStr).valueOr(false):
|
||||
msg.senderId = msgSenderIdStr.SdsParticipantID
|
||||
|
||||
# SDS-R: decode repair request (field 13, optional)
|
||||
var repairBuffers: seq[seq[byte]]
|
||||
if pb.getRepeatedField(13, repairBuffers).isOk():
|
||||
for repairBuffer in repairBuffers:
|
||||
let entryPb = initProtoBuffer(repairBuffer)
|
||||
let entry = ?decodeHistoryEntry(entryPb)
|
||||
msg.repairRequest.add(entry)
|
||||
|
||||
return ok(msg)
|
||||
|
||||
proc extractChannelId*(data: seq[byte]): Result[SdsChannelID, ReliabilityError] =
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
import std/[locks, tables, sequtils]
|
||||
import std/[times, locks, tables, sequtils, hashes]
|
||||
import chronicles, results
|
||||
import ./rolling_bloom_filter
|
||||
import ./types/[
|
||||
sds_message_id, history_entry, sds_message, unacknowledged_message, incoming_message,
|
||||
reliability_error, callbacks, app_callbacks, reliability_config, channel_context,
|
||||
reliability_manager,
|
||||
reliability_error, callbacks, app_callbacks, reliability_config, repair_entry,
|
||||
channel_context, reliability_manager,
|
||||
]
|
||||
export
|
||||
sds_message_id, history_entry, sds_message, unacknowledged_message, incoming_message,
|
||||
reliability_error, callbacks, app_callbacks, reliability_config, channel_context,
|
||||
reliability_manager
|
||||
reliability_error, callbacks, app_callbacks, reliability_config, repair_entry,
|
||||
channel_context, reliability_manager
|
||||
|
||||
proc defaultConfig*(): ReliabilityConfig =
|
||||
return ReliabilityConfig.init()
|
||||
@ -21,7 +21,9 @@ proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
|
||||
for channelId, channel in rm.channels:
|
||||
channel.outgoingBuffer.setLen(0)
|
||||
channel.incomingBuffer.clear()
|
||||
channel.messageHistory.setLen(0)
|
||||
channel.messageHistory.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
rm.channels.clear()
|
||||
except Exception:
|
||||
error "Error during cleanup", error = getCurrentExceptionMsg()
|
||||
@ -38,17 +40,25 @@ proc cleanBloomFilter*(
|
||||
error = getCurrentExceptionMsg(), channelId = channelId
|
||||
|
||||
proc addToHistory*(
|
||||
rm: ReliabilityManager, msgId: SdsMessageID, channelId: SdsChannelID
|
||||
rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID
|
||||
) {.gcsafe, raises: [].} =
|
||||
## Inserts a delivered message into the channel's history map and evicts the
|
||||
## eldest entries when the bound is exceeded. The full SdsMessage is kept so
|
||||
## senderId is available for downstream causal-history population and the
|
||||
## bytes can be re-serialized on demand to answer SDS-R repair requests.
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
let channel = rm.channels[channelId]
|
||||
channel.messageHistory.add(msgId)
|
||||
if channel.messageHistory.len > rm.config.maxMessageHistory:
|
||||
channel.messageHistory.delete(0)
|
||||
channel.messageHistory[msg.messageId] = msg
|
||||
while channel.messageHistory.len > rm.config.maxMessageHistory:
|
||||
var firstKey: SdsMessageID
|
||||
for k in channel.messageHistory.keys:
|
||||
firstKey = k
|
||||
break
|
||||
channel.messageHistory.del(firstKey)
|
||||
except Exception:
|
||||
error "Failed to add to history",
|
||||
channelId = channelId, msgId = msgId, error = getCurrentExceptionMsg()
|
||||
channelId = channelId, msgId = msg.messageId, error = getCurrentExceptionMsg()
|
||||
|
||||
proc updateLamportTimestamp*(
|
||||
rm: ReliabilityManager, msgTs: int64, channelId: SdsChannelID
|
||||
@ -70,21 +80,79 @@ proc toCausalHistory*(messageIds: seq[SdsMessageID]): seq[HistoryEntry] =
|
||||
proc getMessageIds*(causalHistory: seq[HistoryEntry]): seq[SdsMessageID] =
|
||||
return causalHistory.mapIt(it.messageId)
|
||||
|
||||
## SDS-R: Repair computation functions
|
||||
|
||||
proc computeTReq*(
|
||||
participantId: SdsParticipantID,
|
||||
messageId: SdsMessageID,
|
||||
tMin: Duration,
|
||||
tMax: Duration,
|
||||
): Duration =
|
||||
## Computes the repair request backoff duration per SDS-R spec:
|
||||
## T_req = hash(participant_id, message_id) % (T_max - T_min) + T_min
|
||||
let h = abs(hash(participantId.string & messageId))
|
||||
let rangeMs = tMax.inMilliseconds - tMin.inMilliseconds
|
||||
if rangeMs <= 0:
|
||||
return tMin
|
||||
let offsetMs = h mod rangeMs
|
||||
initDuration(milliseconds = tMin.inMilliseconds + offsetMs)
|
||||
|
||||
proc computeTResp*(
|
||||
participantId: SdsParticipantID,
|
||||
senderId: SdsParticipantID,
|
||||
messageId: SdsMessageID,
|
||||
tMax: Duration,
|
||||
): Duration =
|
||||
## Computes the repair response backoff duration per SDS-R spec:
|
||||
## distance = hash(participant_id) XOR hash(sender_id)
|
||||
## T_resp = distance * hash(message_id) % T_max
|
||||
## Original sender has distance=0, so T_resp=0 (responds immediately).
|
||||
let distance = abs(hash(participantId) xor hash(senderId))
|
||||
let msgHash = abs(hash(messageId))
|
||||
let tMaxMs = tMax.inMilliseconds
|
||||
if tMaxMs <= 0 or distance == 0:
|
||||
return initDuration(milliseconds = 0)
|
||||
# Use uint64 to avoid overflow on multiplication
|
||||
let d = uint64(distance mod tMaxMs)
|
||||
let m = uint64(msgHash mod tMaxMs)
|
||||
let offsetMs = int64((d * m) mod uint64(tMaxMs))
|
||||
initDuration(milliseconds = offsetMs)
|
||||
|
||||
proc isInResponseGroup*(
|
||||
participantId: SdsParticipantID,
|
||||
senderId: SdsParticipantID,
|
||||
messageId: SdsMessageID,
|
||||
numResponseGroups: int,
|
||||
): bool =
|
||||
## Determines if this participant is in the response group for a given message per SDS-R spec:
|
||||
## hash(participant_id, message_id) % num_groups == hash(sender_id, message_id) % num_groups
|
||||
if numResponseGroups <= 1:
|
||||
return true # All participants in the same group
|
||||
let myGroup = abs(hash(participantId.string & messageId)) mod numResponseGroups
|
||||
let senderGroup = abs(hash(senderId.string & messageId)) mod numResponseGroups
|
||||
myGroup == senderGroup
|
||||
|
||||
proc getRecentHistoryEntries*(
|
||||
rm: ReliabilityManager, n: int, channelId: SdsChannelID
|
||||
): seq[HistoryEntry] =
|
||||
## Get recent history entries for sending in causal history.
|
||||
## Populates retrieval hints and senderId (SDS-R) for each entry.
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
let channel = rm.channels[channelId]
|
||||
let recentMessageIds = channel.messageHistory[max(0, channel.messageHistory.len - n) .. ^1]
|
||||
if rm.onRetrievalHint.isNil():
|
||||
return toCausalHistory(recentMessageIds)
|
||||
else:
|
||||
var entries: seq[HistoryEntry] = @[]
|
||||
for msgId in recentMessageIds:
|
||||
let hint = rm.onRetrievalHint(msgId)
|
||||
entries.add(newHistoryEntry(msgId, hint))
|
||||
return entries
|
||||
var orderedIds: seq[SdsMessageID] = @[]
|
||||
for msgId in channel.messageHistory.keys:
|
||||
orderedIds.add(msgId)
|
||||
let recentMessageIds =
|
||||
orderedIds[max(0, orderedIds.len - n) .. ^1]
|
||||
var entries: seq[HistoryEntry] = @[]
|
||||
for msgId in recentMessageIds:
|
||||
var entry = HistoryEntry(messageId: msgId)
|
||||
if not rm.onRetrievalHint.isNil():
|
||||
entry.retrievalHint = rm.onRetrievalHint(msgId)
|
||||
entry.senderId = channel.messageHistory[msgId].senderId
|
||||
entries.add(entry)
|
||||
return entries
|
||||
else:
|
||||
return @[]
|
||||
except Exception:
|
||||
@ -116,7 +184,10 @@ proc getMessageHistory*(
|
||||
withLock rm.lock:
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
return rm.channels[channelId].messageHistory
|
||||
var ids: seq[SdsMessageID] = @[]
|
||||
for msgId in rm.channels[channelId].messageHistory.keys:
|
||||
ids.add(msgId)
|
||||
return ids
|
||||
else:
|
||||
return @[]
|
||||
except Exception:
|
||||
@ -187,7 +258,9 @@ proc removeChannel*(
|
||||
let channel = rm.channels[channelId]
|
||||
channel.outgoingBuffer.setLen(0)
|
||||
channel.incomingBuffer.clear()
|
||||
channel.messageHistory.setLen(0)
|
||||
channel.messageHistory.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
rm.channels.del(channelId)
|
||||
return ok()
|
||||
except Exception:
|
||||
|
||||
@ -9,6 +9,7 @@ import sds/types/reliability_error
|
||||
import sds/types/callbacks
|
||||
import sds/types/app_callbacks
|
||||
import sds/types/reliability_config
|
||||
import sds/types/repair_entry
|
||||
import sds/types/channel_context
|
||||
import sds/types/reliability_manager
|
||||
import sds/types/protobuf_error
|
||||
@ -25,6 +26,7 @@ export
|
||||
callbacks,
|
||||
app_callbacks,
|
||||
reliability_config,
|
||||
repair_entry,
|
||||
channel_context,
|
||||
reliability_manager,
|
||||
protobuf_error
|
||||
|
||||
@ -7,6 +7,7 @@ type AppCallbacks* = ref object
|
||||
missingDependenciesCb*: MissingDependenciesCallback
|
||||
periodicSyncCb*: PeriodicSyncCallback
|
||||
retrievalHintProvider*: RetrievalHintProvider
|
||||
repairReadyCb*: RepairReadyCallback
|
||||
|
||||
proc new*(
|
||||
T: type AppCallbacks,
|
||||
@ -15,6 +16,7 @@ proc new*(
|
||||
missingDependenciesCb: MissingDependenciesCallback = nil,
|
||||
periodicSyncCb: PeriodicSyncCallback = nil,
|
||||
retrievalHintProvider: RetrievalHintProvider = nil,
|
||||
repairReadyCb: RepairReadyCallback = nil,
|
||||
): T =
|
||||
return T(
|
||||
messageReadyCb: messageReadyCb,
|
||||
@ -22,4 +24,5 @@ proc new*(
|
||||
missingDependenciesCb: missingDependenciesCb,
|
||||
periodicSyncCb: periodicSyncCb,
|
||||
retrievalHintProvider: retrievalHintProvider,
|
||||
repairReadyCb: repairReadyCb,
|
||||
)
|
||||
|
||||
@ -16,3 +16,5 @@ type
|
||||
RetrievalHintProvider* = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.}
|
||||
|
||||
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
|
||||
|
||||
RepairReadyCallback* = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.}
|
||||
|
||||
@ -1,22 +1,36 @@
|
||||
import std/tables
|
||||
import ./sds_message_id
|
||||
import ./sds_message
|
||||
import ./rolling_bloom_filter
|
||||
import ./unacknowledged_message
|
||||
import ./incoming_message
|
||||
export sds_message_id, rolling_bloom_filter, unacknowledged_message, incoming_message
|
||||
import ./repair_entry
|
||||
export
|
||||
sds_message_id, sds_message, rolling_bloom_filter, unacknowledged_message,
|
||||
incoming_message, repair_entry
|
||||
|
||||
type ChannelContext* = ref object
|
||||
lamportTimestamp*: int64
|
||||
messageHistory*: seq[SdsMessageID]
|
||||
messageHistory*: OrderedTable[SdsMessageID, SdsMessage]
|
||||
## Single source of truth for delivered messages. Holds the deserialized
|
||||
## SdsMessage (which carries senderId, lamportTimestamp, content, etc.) so
|
||||
## causal history, sender lookup, and SDS-R repair responses can all be
|
||||
## answered from one place. OrderedTable preserves insertion order for
|
||||
## causal-history tail access and FIFO eviction at maxMessageHistory.
|
||||
bloomFilter*: RollingBloomFilter
|
||||
outgoingBuffer*: seq[UnacknowledgedMessage]
|
||||
incomingBuffer*: Table[SdsMessageID, IncomingMessage]
|
||||
## SDS-R buffers
|
||||
outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry]
|
||||
incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry]
|
||||
|
||||
proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T =
|
||||
return T(
|
||||
lamportTimestamp: 0,
|
||||
messageHistory: @[],
|
||||
messageHistory: initOrderedTable[SdsMessageID, SdsMessage](),
|
||||
bloomFilter: bloomFilter,
|
||||
outgoingBuffer: @[],
|
||||
incomingBuffer: initTable[SdsMessageID, IncomingMessage](),
|
||||
outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](),
|
||||
incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](),
|
||||
)
|
||||
|
||||
@ -1,8 +1,15 @@
|
||||
import ./sds_message_id
|
||||
export sds_message_id
|
||||
|
||||
type HistoryEntry* = object
|
||||
messageId*: SdsMessageID
|
||||
retrievalHint*: seq[byte] ## Optional hint for efficient retrieval (e.g., Waku message hash)
|
||||
senderId*: SdsParticipantID ## Original message sender's participant ID (SDS-R)
|
||||
|
||||
proc init*(T: type HistoryEntry, messageId: SdsMessageID, retrievalHint: seq[byte] = @[]): T =
|
||||
return T(messageId: messageId, retrievalHint: retrievalHint)
|
||||
proc init*(
|
||||
T: type HistoryEntry,
|
||||
messageId: SdsMessageID,
|
||||
retrievalHint: seq[byte] = @[],
|
||||
senderId: SdsParticipantID = "".SdsParticipantID,
|
||||
): T =
|
||||
return T(messageId: messageId, retrievalHint: retrievalHint, senderId: senderId)
|
||||
|
||||
@ -7,6 +7,11 @@ const
|
||||
DefaultMaxResendAttempts* = 5
|
||||
DefaultSyncMessageInterval* = initDuration(seconds = 30)
|
||||
DefaultBufferSweepInterval* = initDuration(seconds = 60)
|
||||
DefaultRepairTMin* = initDuration(seconds = 30)
|
||||
DefaultRepairTMax* = initDuration(seconds = 300)
|
||||
DefaultNumResponseGroups* = 1
|
||||
DefaultMaxRepairRequests* = 3
|
||||
DefaultRepairSweepInterval* = initDuration(seconds = 5)
|
||||
MaxMessageSize* = 1024 * 1024 # 1 MB
|
||||
|
||||
import ./rolling_bloom_filter
|
||||
@ -21,6 +26,12 @@ type ReliabilityConfig* {.requiresInit.} = object
|
||||
maxResendAttempts*: int
|
||||
syncMessageInterval*: Duration
|
||||
bufferSweepInterval*: Duration
|
||||
## SDS-R config
|
||||
repairTMin*: Duration
|
||||
repairTMax*: Duration
|
||||
numResponseGroups*: int
|
||||
maxRepairRequests*: int
|
||||
repairSweepInterval*: Duration
|
||||
|
||||
proc init*(
|
||||
T: type ReliabilityConfig,
|
||||
@ -32,6 +43,11 @@ proc init*(
|
||||
maxResendAttempts: int = DefaultMaxResendAttempts,
|
||||
syncMessageInterval: Duration = DefaultSyncMessageInterval,
|
||||
bufferSweepInterval: Duration = DefaultBufferSweepInterval,
|
||||
repairTMin: Duration = DefaultRepairTMin,
|
||||
repairTMax: Duration = DefaultRepairTMax,
|
||||
numResponseGroups: int = DefaultNumResponseGroups,
|
||||
maxRepairRequests: int = DefaultMaxRepairRequests,
|
||||
repairSweepInterval: Duration = DefaultRepairSweepInterval,
|
||||
): T =
|
||||
return T(
|
||||
bloomFilterCapacity: bloomFilterCapacity,
|
||||
@ -42,4 +58,9 @@ proc init*(
|
||||
maxResendAttempts: maxResendAttempts,
|
||||
syncMessageInterval: syncMessageInterval,
|
||||
bufferSweepInterval: bufferSweepInterval,
|
||||
repairTMin: repairTMin,
|
||||
repairTMax: repairTMax,
|
||||
numResponseGroups: numResponseGroups,
|
||||
maxRepairRequests: maxRepairRequests,
|
||||
repairSweepInterval: repairSweepInterval,
|
||||
)
|
||||
|
||||
@ -9,6 +9,7 @@ export sds_message_id, history_entry, callbacks, reliability_config, channel_con
|
||||
type ReliabilityManager* = ref object
|
||||
channels*: Table[SdsChannelID, ChannelContext]
|
||||
config*: ReliabilityConfig
|
||||
participantId*: SdsParticipantID
|
||||
lock*: Lock
|
||||
onMessageReady*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
|
||||
onMessageSent*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
|
||||
@ -17,11 +18,17 @@ type ReliabilityManager* = ref object
|
||||
) {.gcsafe.}
|
||||
onPeriodicSync*: PeriodicSyncCallback
|
||||
onRetrievalHint*: RetrievalHintProvider
|
||||
onRepairReady*: RepairReadyCallback
|
||||
|
||||
proc new*(T: type ReliabilityManager, config: ReliabilityConfig): T =
|
||||
proc new*(
|
||||
T: type ReliabilityManager,
|
||||
config: ReliabilityConfig,
|
||||
participantId: SdsParticipantID = "".SdsParticipantID,
|
||||
): T =
|
||||
let rm = T(
|
||||
channels: initTable[SdsChannelID, ChannelContext](),
|
||||
config: config,
|
||||
participantId: participantId,
|
||||
)
|
||||
rm.lock.initLock()
|
||||
return rm
|
||||
|
||||
36
sds/types/repair_entry.nim
Normal file
36
sds/types/repair_entry.nim
Normal file
@ -0,0 +1,36 @@
|
||||
import std/times
|
||||
import ./history_entry
|
||||
export history_entry
|
||||
|
||||
type
|
||||
OutgoingRepairEntry* {.requiresInit.} = object
|
||||
## Entry in the outgoing repair request buffer (SDS-R).
|
||||
## Tracks a missing message we want to request repair for.
|
||||
outHistEntry*: HistoryEntry ## The missing history entry
|
||||
minTimeRepairReq*: Time
|
||||
## Earliest time at which we will include this in a repair request (T_REQ in spec)
|
||||
|
||||
IncomingRepairEntry* {.requiresInit.} = object
|
||||
## Entry in the incoming repair request buffer (SDS-R).
|
||||
## Tracks a repair request from a remote peer that we might respond to.
|
||||
inHistEntry*: HistoryEntry ## The requested history entry
|
||||
cachedMessage*: seq[byte] ## Full serialized SDS message for rebroadcast
|
||||
minTimeRepairResp*: Time
|
||||
## Earliest time at which we will rebroadcast (T_RESP in spec)
|
||||
|
||||
proc init*(
|
||||
T: type OutgoingRepairEntry, outHistEntry: HistoryEntry, minTimeRepairReq: Time
|
||||
): T =
|
||||
return T(outHistEntry: outHistEntry, minTimeRepairReq: minTimeRepairReq)
|
||||
|
||||
proc init*(
|
||||
T: type IncomingRepairEntry,
|
||||
inHistEntry: HistoryEntry,
|
||||
cachedMessage: seq[byte],
|
||||
minTimeRepairResp: Time,
|
||||
): T =
|
||||
return T(
|
||||
inHistEntry: inHistEntry,
|
||||
cachedMessage: cachedMessage,
|
||||
minTimeRepairResp: minTimeRepairResp,
|
||||
)
|
||||
@ -9,6 +9,9 @@ type SdsMessage* {.requiresInit.} = object
|
||||
channelId*: SdsChannelID
|
||||
content*: seq[byte]
|
||||
bloomFilter*: seq[byte]
|
||||
senderId*: SdsParticipantID ## SDS-R: original sender's participant ID
|
||||
repairRequest*: seq[HistoryEntry]
|
||||
## Capped list of missing entries requesting repair (SDS-R)
|
||||
|
||||
proc init*(
|
||||
T: type SdsMessage,
|
||||
@ -18,6 +21,8 @@ proc init*(
|
||||
channelId: SdsChannelID,
|
||||
content: seq[byte],
|
||||
bloomFilter: seq[byte],
|
||||
senderId: SdsParticipantID = "".SdsParticipantID,
|
||||
repairRequest: seq[HistoryEntry] = @[],
|
||||
): T =
|
||||
return T(
|
||||
messageId: messageId,
|
||||
@ -26,4 +31,6 @@ proc init*(
|
||||
channelId: channelId,
|
||||
content: content,
|
||||
bloomFilter: bloomFilter,
|
||||
senderId: senderId,
|
||||
repairRequest: repairRequest,
|
||||
)
|
||||
|
||||
@ -1,3 +1,11 @@
|
||||
import std/hashes
|
||||
|
||||
type
|
||||
SdsMessageID* = string
|
||||
SdsChannelID* = string
|
||||
SdsParticipantID* = distinct string
|
||||
|
||||
proc `==`*(a, b: SdsParticipantID): bool {.borrow.}
|
||||
proc `$`*(p: SdsParticipantID): string {.borrow.}
|
||||
proc len*(p: SdsParticipantID): int {.borrow.}
|
||||
proc hash*(p: SdsParticipantID): Hash {.borrow.}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user