From 7ba27717f1615d81cb3ca9b4b74db4c4a4628a0c Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Mon, 14 Oct 2024 15:05:19 +0400 Subject: [PATCH] init draft --- reliability.nimble | 14 +++ src/reliability.nim | 252 +++++++++++++++++++++++++++++++++++++ tests/test_reliability.nim | 79 ++++++++++++ 3 files changed, 345 insertions(+) create mode 100644 reliability.nimble create mode 100644 src/reliability.nim create mode 100644 tests/test_reliability.nim diff --git a/reliability.nimble b/reliability.nimble new file mode 100644 index 0000000..cb7d48d --- /dev/null +++ b/reliability.nimble @@ -0,0 +1,14 @@ +# Package +version = "0.1.0" +author = "Waku Team" +description = "E2E Reliability Protocol API" +license = "MIT" +srcDir = "src" + +# Dependencies +requires "nim >= 1.6.0" +requires "nimsha2" +requires "chronicles" + +task test, "Run the test suite": + exec "nim c -r tests/test_reliability.nim" \ No newline at end of file diff --git a/src/reliability.nim b/src/reliability.nim new file mode 100644 index 0000000..9805a22 --- /dev/null +++ b/src/reliability.nim @@ -0,0 +1,252 @@ +import std/[times, sets, hashes, random, sequtils, algorithm] +import nimsha2 +import chronicles + +type + MessageID* = string + + Message* = object + senderId*: string + messageId*: MessageID + lamportTimestamp*: int64 + causalHistory*: seq[MessageID] + channelId*: string + content*: string + bloomFilter*: seq[byte] + + UnacknowledgedMessage* = object + message*: Message + sendTime*: Time + resendAttempts*: int + + TimestampedMessageID* = object + id*: MessageID + timestamp*: Time + + RollingBloomFilter* = object + # TODO: Implement a proper Bloom filter + data: HashSet[MessageID] + + ReliabilityManager* = ref object + lamportTimestamp: int64 + messageHistory: seq[MessageID] + bloomFilter: RollingBloomFilter + outgoingBuffer: seq[UnacknowledgedMessage] + incomingBuffer: seq[Message] + channelId: string + onMessageReady*: proc(messageId: MessageID) + onMessageSent*: proc(messageId: MessageID) + onPeriodicSync*: proc() + +proc newRollingBloomFilter(): RollingBloomFilter = + result.data = initHashSet[MessageID]() + +proc add(filter: var RollingBloomFilter, item: MessageID) = + filter.data.incl(item) + +proc contains(filter: RollingBloomFilter, item: MessageID): bool = + item in filter.data + +proc newReliabilityManager*(channelId: string): ReliabilityManager = + result = ReliabilityManager( + lamportTimestamp: 0, + messageHistory: @[], + bloomFilter: newRollingBloomFilter(), + outgoingBuffer: @[], + incomingBuffer: @[], + channelId: channelId + ) + +proc generateUniqueID(): MessageID = + $secureHash($getTime().toUnix & $rand(high(int))) + +proc updateLamportTimestamp(rm: ReliabilityManager, msgTs: int64) = + rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp + 1) + +proc getRecentMessageIDs(rm: ReliabilityManager, n: int): seq[MessageID] = + result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] + +proc wrapOutgoingMessage*(rm: ReliabilityManager, message: string): Message = + rm.updateLamportTimestamp(getTime().toUnix) + let msg = Message( + senderId: "TODO_SENDER_ID", + messageId: generateUniqueID(), + lamportTimestamp: rm.lamportTimestamp, + causalHistory: rm.getRecentMessageIDs(10), + channelId: rm.channelId, + content: message, + bloomFilter: @[] # TODO: Implement proper Bloom filter serialization + ) + rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)) + msg + +proc unwrapReceivedMessage*(rm: ReliabilityManager, message: Message): tuple[message: Message, missingDeps: seq[MessageID]] = + if rm.bloomFilter.contains(message.messageId): + return (message, @[]) + + rm.bloomFilter.add(message.messageId) + rm.updateLamportTimestamp(message.lamportTimestamp) + + var missingDeps: seq[MessageID] = @[] + for depId in message.causalHistory: + if depId notin rm.messageHistory: + missingDeps.add(depId) + + if missingDeps.len == 0: + rm.messageHistory.add(message.messageId) + if rm.onMessageReady != nil: + rm.onMessageReady(message.messageId) + else: + rm.incomingBuffer.add(message) + + (message, missingDeps) + +proc markDependenciesMet*(rm: ReliabilityManager, messageIds: seq[MessageID]) = + var processedMessages: seq[Message] = @[] + rm.incomingBuffer = rm.incomingBuffer.filterIt( + not messageIds.allIt(it in it.causalHistory or it in rm.messageHistory) + ) + + for msg in processedMessages: + rm.messageHistory.add(msg.messageId) + if rm.onMessageReady != nil: + rm.onMessageReady(msg.messageId) + +proc checkUnacknowledgedMessages(rm: ReliabilityManager) = + let now = getTime() + rm.outgoingBuffer = rm.outgoingBuffer.filterIt((now - it.sendTime).inSeconds < 60) + for msg in rm.outgoingBuffer: + if rm.onMessageSent != nil: + rm.onMessageSent(msg.message.messageId) + +proc periodicSync(rm: ReliabilityManager) = + if rm.onPeriodicSync != nil: + rm.onPeriodicSync() + +proc setCallbacks*(rm: ReliabilityManager, + onMessageReady: proc(messageId: MessageID), + onMessageSent: proc(messageId: MessageID), + onPeriodicSync: proc()) = + rm.onMessageReady = onMessageReady + rm.onMessageSent = onMessageSent + rm.onPeriodicSync = onPeriodicSync + +# Logging +proc logInfo(msg: string) = + info msg + +proc logError(msg: string) = + error msg + +# Export C API +{.push exportc, cdecl.} + +type + CMessage {.bycopy.} = object + senderId: cstring + messageId: cstring + lamportTimestamp: int64 + causalHistory: ptr UncheckedArray[cstring] + causalHistoryLen: cint + channelId: cstring + content: cstring + bloomFilter: ptr UncheckedArray[byte] + bloomFilterLen: cint + + CUnwrapResult {.bycopy.} = object + message: CMessage + missingDeps: ptr UncheckedArray[cstring] + missingDepsLen: cint + +proc reliability_manager_new(channelId: cstring): pointer {.exportc, cdecl.} = + let rm = newReliabilityManager($channelId) + GC_ref(rm) + result = cast[pointer](rm) + +proc reliability_manager_free(rmPtr: pointer) {.exportc, cdecl.} = + let rm = cast[ReliabilityManager](rmPtr) + GC_unref(rm) + +proc wrap_outgoing_message(rmPtr: pointer, message: cstring): CMessage {.exportc, cdecl.} = + let rm = cast[ReliabilityManager](rmPtr) + let wrappedMsg = rm.wrapOutgoingMessage($message) + + result.senderId = wrappedMsg.senderId.cstring + result.messageId = wrappedMsg.messageId.cstring + result.lamportTimestamp = wrappedMsg.lamportTimestamp + result.causalHistory = cast[ptr UncheckedArray[cstring]](alloc0(wrappedMsg.causalHistory.len * sizeof(cstring))) + result.causalHistoryLen = wrappedMsg.causalHistory.len.cint + for i, id in wrappedMsg.causalHistory: + result.causalHistory[i] = id.cstring + result.channelId = wrappedMsg.channelId.cstring + result.content = wrappedMsg.content.cstring + result.bloomFilter = cast[ptr UncheckedArray[byte]](alloc0(wrappedMsg.bloomFilter.len)) + result.bloomFilterLen = wrappedMsg.bloomFilter.len.cint + copyMem(result.bloomFilter, addr wrappedMsg.bloomFilter[0], wrappedMsg.bloomFilter.len) + +proc unwrap_received_message(rmPtr: pointer, msg: CMessage): CUnwrapResult {.exportc, cdecl.} = + let rm = cast[ReliabilityManager](rmPtr) + var nimMsg = Message( + senderId: $msg.senderId, + messageId: $msg.messageId, + lamportTimestamp: msg.lamportTimestamp, + causalHistory: newSeq[string](msg.causalHistoryLen), + channelId: $msg.channelId, + content: $msg.content, + bloomFilter: newSeq[byte](msg.bloomFilterLen) + ) + for i in 0 ..< msg.causalHistoryLen: + nimMsg.causalHistory[i] = $msg.causalHistory[i] + copyMem(addr nimMsg.bloomFilter[0], msg.bloomFilter, msg.bloomFilterLen) + + let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(nimMsg) + + result.message = CMessage( + senderId: unwrappedMsg.senderId.cstring, + messageId: unwrappedMsg.messageId.cstring, + lamportTimestamp: unwrappedMsg.lamportTimestamp, + causalHistory: cast[ptr UncheckedArray[cstring]](alloc0(unwrappedMsg.causalHistory.len * sizeof(cstring))), + causalHistoryLen: unwrappedMsg.causalHistory.len.cint, + channelId: unwrappedMsg.channelId.cstring, + content: unwrappedMsg.content.cstring, + bloomFilter: cast[ptr UncheckedArray[byte]](alloc0(unwrappedMsg.bloomFilter.len)), + bloomFilterLen: unwrappedMsg.bloomFilter.len.cint + ) + for i, id in unwrappedMsg.causalHistory: + result.message.causalHistory[i] = id.cstring + copyMem(result.message.bloomFilter, addr unwrappedMsg.bloomFilter[0], unwrappedMsg.bloomFilter.len) + + result.missingDeps = cast[ptr UncheckedArray[cstring]](alloc0(missingDeps.len * sizeof(cstring))) + result.missingDepsLen = missingDeps.len.cint + for i, id in missingDeps: + result.missingDeps[i] = id.cstring + +proc mark_dependencies_met(rmPtr: pointer, messageIds: ptr UncheckedArray[cstring], count: cint) {.exportc, cdecl.} = + let rm = cast[ReliabilityManager](rmPtr) + var nimMessageIds = newSeq[string](count) + for i in 0 ..< count: + nimMessageIds[i] = $messageIds[i] + rm.markDependenciesMet(nimMessageIds) + +proc set_callbacks(rmPtr: pointer, + onMessageReady: proc(messageId: cstring) {.cdecl.}, + onMessageSent: proc(messageId: cstring) {.cdecl.}, + onPeriodicSync: proc() {.cdecl.}) {.exportc, cdecl.} = + let rm = cast[ReliabilityManager](rmPtr) + rm.setCallbacks( + proc(messageId: MessageID) = onMessageReady(messageId.cstring), + proc(messageId: MessageID) = onMessageSent(messageId.cstring), + onPeriodicSync + ) + +{.pop.} + +when isMainModule: + # TODO: Add some basic tests / examples + let rm = newReliabilityManager("testChannel") + let msg = rm.wrapOutgoingMessage("Hello, World!") + echo "Wrapped message: ", msg + + let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(msg) + echo "Unwrapped message: ", unwrappedMsg + echo "Missing dependencies: ", missingDeps \ No newline at end of file diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim new file mode 100644 index 0000000..241ae43 --- /dev/null +++ b/tests/test_reliability.nim @@ -0,0 +1,79 @@ +import unittest +import ../src/reliability + +suite "ReliabilityManager": + setup: + let rm = newReliabilityManager("testChannel") + + test "wrapOutgoingMessage": + let msg = rm.wrapOutgoingMessage("Hello, World!") + check: + msg.content == "Hello, World!" + msg.channelId == "testChannel" + msg.causalHistory.len == 0 + + test "unwrapReceivedMessage": + let wrappedMsg = rm.wrapOutgoingMessage("Test message") + let (unwrappedMsg, missingDeps) = rm.unwrapReceivedMessage(wrappedMsg) + check: + unwrappedMsg.content == "Test message" + missingDeps.len == 0 + + test "markDependenciesMet": + let msg1 = rm.wrapOutgoingMessage("Message 1") + let msg2 = rm.wrapOutgoingMessage("Message 2") + let msg3 = rm.wrapOutgoingMessage("Message 3") + + var (_, missingDeps) = rm.unwrapReceivedMessage(msg3) + check missingDeps.len == 2 + + rm.markDependenciesMet(@[msg1.messageId, msg2.messageId]) + (_, missingDeps) = rm.unwrapReceivedMessage(msg3) + check missingDeps.len == 0 + + test "callbacks": + var messageReadyCount = 0 + var messageSentCount = 0 + var periodicSyncCount = 0 + + rm.setCallbacks( + proc(messageId: MessageID) = messageReadyCount += 1, + proc(messageId: MessageID) = messageSentCount += 1, + proc() = periodicSyncCount += 1 + ) + + let msg = rm.wrapOutgoingMessage("Test callback") + discard rm.unwrapReceivedMessage(msg) + + check: + messageReadyCount == 1 + messageSentCount == 0 # This would be triggered by the checkUnacknowledgedMessages function + periodicSyncCount == 0 # This would be triggered by the periodicSync function + + test "lamport timestamps": + let msg1 = rm.wrapOutgoingMessage("Message 1") + let msg2 = rm.wrapOutgoingMessage("Message 2") + check msg2.lamportTimestamp > msg1.lamportTimestamp + + let msg3 = Message(lamportTimestamp: msg2.lamportTimestamp + 10, messageId: generateUniqueID(), content: "Message 3") + discard rm.unwrapReceivedMessage(msg3) + let msg4 = rm.wrapOutgoingMessage("Message 4") + check msg4.lamportTimestamp > msg3.lamportTimestamp + + test "causal history": + let msg1 = rm.wrapOutgoingMessage("Message 1") + let msg2 = rm.wrapOutgoingMessage("Message 2") + let msg3 = rm.wrapOutgoingMessage("Message 3") + + check: + msg2.causalHistory.contains(msg1.messageId) + msg3.causalHistory.contains(msg2.messageId) + msg3.causalHistory.contains(msg1.messageId) + + test "bloom filter": + let msg1 = rm.wrapOutgoingMessage("Message 1") + let (_, missingDeps1) = rm.unwrapReceivedMessage(msg1) + check missingDeps1.len == 0 + + let (_, missingDeps2) = rm.unwrapReceivedMessage(msg1) + check missingDeps2.len == 0 # The message should be in the bloom filter and not processed again \ No newline at end of file