From 24d60267506b45f05567b45aa86dcedfa919c307 Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Mon, 13 Jan 2025 17:32:33 +0400 Subject: [PATCH] chore: rearrange files --- src/common.nim | 74 ---------------- src/message.nim | 30 +++++++ src/protobuf.nim | 4 +- src/protobufutil.nim | 8 +- src/reliability.nim | 73 +--------------- src/reliability_utils.nim | 95 +++++++++++++++++++++ src/{utils.nim => rolling_bloom_filter.nim} | 51 ++++------- tests/test_reliability.nim | 39 ++++++--- 8 files changed, 172 insertions(+), 202 deletions(-) delete mode 100644 src/common.nim create mode 100644 src/message.nim create mode 100644 src/reliability_utils.nim rename src/{utils.nim => rolling_bloom_filter.nim} (68%) diff --git a/src/common.nim b/src/common.nim deleted file mode 100644 index e2bdde8..0000000 --- a/src/common.nim +++ /dev/null @@ -1,74 +0,0 @@ -import std/[times, locks] -import ./bloom - -type - MessageID* = string - - Message* = object - messageId*: MessageID - lamportTimestamp*: int64 - causalHistory*: seq[MessageID] - channelId*: string - content*: seq[byte] - bloomFilter*: seq[byte] - - UnacknowledgedMessage* = object - message*: Message - sendTime*: Time - resendAttempts*: int - - TimestampedMessageID* = object - id*: MessageID - timestamp*: Time - - PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} - - RollingBloomFilter* = object - filter*: BloomFilter - window*: times.Duration - messages*: seq[TimestampedMessageID] - - ReliabilityConfig* = object - bloomFilterCapacity*: int - bloomFilterErrorRate*: float - bloomFilterWindow*: times.Duration - maxMessageHistory*: int - maxCausalHistory*: int - resendInterval*: times.Duration - maxResendAttempts*: int - syncMessageInterval*: times.Duration - bufferSweepInterval*: times.Duration - - ReliabilityManager* = ref object - lamportTimestamp*: int64 - messageHistory*: seq[MessageID] - bloomFilter*: RollingBloomFilter - outgoingBuffer*: seq[UnacknowledgedMessage] - incomingBuffer*: seq[Message] - channelId*: string - config*: ReliabilityConfig - lock*: Lock - onMessageReady*: proc(messageId: MessageID) {.gcsafe.} - onMessageSent*: proc(messageId: MessageID) {.gcsafe.} - onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} - onPeriodicSync*: PeriodicSyncCallback - - ReliabilityError* = enum - reInvalidArgument - reOutOfMemory - reInternalError - reSerializationError - reDeserializationError - reMessageTooLarge - -const - DefaultBloomFilterCapacity* = 10000 - DefaultBloomFilterErrorRate* = 0.001 - DefaultBloomFilterWindow* = initDuration(hours = 1) - DefaultMaxMessageHistory* = 1000 - DefaultMaxCausalHistory* = 10 - DefaultResendInterval* = initDuration(seconds = 60) - DefaultMaxResendAttempts* = 5 - DefaultSyncMessageInterval* = initDuration(seconds = 30) - DefaultBufferSweepInterval* = initDuration(seconds = 60) - MaxMessageSize* = 1024 * 1024 # 1 MB \ No newline at end of file diff --git a/src/message.nim b/src/message.nim new file mode 100644 index 0000000..e556ec9 --- /dev/null +++ b/src/message.nim @@ -0,0 +1,30 @@ +import std/times + +type + MessageID* = string + + Message* = object + messageId*: MessageID + lamportTimestamp*: int64 + causalHistory*: seq[MessageID] + channelId*: string + content*: seq[byte] + bloomFilter*: seq[byte] + + UnacknowledgedMessage* = object + message*: Message + sendTime*: Time + resendAttempts*: int + + TimestampedMessageID* = object + id*: MessageID + timestamp*: Time + +const + DefaultMaxMessageHistory* = 1000 + DefaultMaxCausalHistory* = 10 + DefaultResendInterval* = initDuration(seconds = 60) + DefaultMaxResendAttempts* = 5 + DefaultSyncMessageInterval* = initDuration(seconds = 30) + DefaultBufferSweepInterval* = initDuration(seconds = 60) + MaxMessageSize* = 1024 * 1024 # 1 MB \ No newline at end of file diff --git a/src/protobuf.nim b/src/protobuf.nim index acdd733..87e75e9 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -1,8 +1,6 @@ -import ./protobufutil -import ./common -import ./bloom import libp2p/protobuf/minprotobuf import std/options +import ../src/[message, protobufutil, bloom, reliability_utils] proc toBytes(s: string): seq[byte] = result = newSeq[byte](s.len) diff --git a/src/protobufutil.nim b/src/protobufutil.nim index 15b3e33..a4928f3 100644 --- a/src/protobufutil.nim +++ b/src/protobufutil.nim @@ -11,13 +11,12 @@ type ProtobufErrorKind* {.pure.} = enum DecodeFailure MissingRequiredField - InvalidLengthField ProtobufError* = object case kind*: ProtobufErrorKind of DecodeFailure: error*: minprotobuf.ProtoError - of MissingRequiredField, InvalidLengthField: + of MissingRequiredField: field*: string ProtobufResult*[T] = Result[T, ProtobufError] @@ -30,7 +29,4 @@ converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError = ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err) proc missingRequiredField*(T: type ProtobufError, field: string): T = - ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field) - -proc invalidLengthField*(T: type ProtobufError, field: string): T = - ProtobufError(kind: ProtobufErrorKind.InvalidLengthField, field: field) \ No newline at end of file + ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field) \ No newline at end of file diff --git a/src/reliability.nim b/src/reliability.nim index 31280da..29e3c36 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -1,26 +1,6 @@ -import std/[times, locks] +import std/[times, locks, tables, sets] import chronos, results -import ./common -import ./utils -import ./protobuf -import std/[tables, sets] - -proc defaultConfig*(): ReliabilityConfig = - ## Creates a default configuration for the ReliabilityManager. - ## - ## Returns: - ## A ReliabilityConfig object with default values. - ReliabilityConfig( - bloomFilterCapacity: DefaultBloomFilterCapacity, - bloomFilterErrorRate: DefaultBloomFilterErrorRate, - bloomFilterWindow: DefaultBloomFilterWindow, - maxMessageHistory: DefaultMaxMessageHistory, - maxCausalHistory: DefaultMaxCausalHistory, - resendInterval: DefaultResendInterval, - maxResendAttempts: DefaultMaxResendAttempts, - syncMessageInterval: DefaultSyncMessageInterval, - bufferSweepInterval: DefaultBufferSweepInterval - ) +import ../src/[message, protobuf, reliability_utils, rolling_bloom_filter] proc newReliabilityManager*(channelId: string, config: ReliabilityConfig = defaultConfig()): Result[ReliabilityManager, ReliabilityError] = ## Creates a new ReliabilityManager with the specified channel ID and configuration. @@ -189,43 +169,6 @@ proc processIncomingBuffer(rm: ReliabilityManager) = newIncomingBuffer.add(msg) rm.incomingBuffer = newIncomingBuffer - # withLock rm.lock: - # var processedAny = true - # while processedAny: - # processedAny = false - # var newIncomingBuffer: seq[Message] = @[] - - # for msg in rm.incomingBuffer: - # var allDependenciesMet = true - # for depId in msg.causalHistory: - # if not rm.bloomFilter.contains(depId): - # allDependenciesMet = false - # break - - # # Check if dependency is still in incoming buffer - # for bufferedMsg in rm.incomingBuffer: - # if bufferedMsg.messageId == depId: - # allDependenciesMet = false - # break - - # if not allDependenciesMet: - # break - - # if allDependenciesMet: - # # Process message - # rm.addToHistory(msg.messageId) - # if rm.onMessageReady != nil: - # rm.onMessageReady(msg.messageId) - # processedAny = true - # else: - # # Keep in buffer - # newIncomingBuffer.add(msg) - - # rm.incomingBuffer = newIncomingBuffer - - # # Exit if no messages were processed in this pass - # if not processedAny: - # break proc unwrapReceivedMessage*(rm: ReliabilityManager, message: seq[byte]): Result[tuple[message: seq[byte], missingDeps: seq[MessageID]], ReliabilityError] = ## Unwraps a received message and processes its reliability metadata. @@ -396,14 +339,4 @@ proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityE ) return ok() except: - return err(reInternalError) - -proc cleanup*(rm: ReliabilityManager) {.raises: [].} = - if not rm.isNil: - {.gcsafe.}: - try: - rm.outgoingBuffer.setLen(0) - rm.incomingBuffer.setLen(0) - rm.messageHistory.setLen(0) - except Exception as e: - logError("Error during cleanup: " & e.msg) \ No newline at end of file + return err(reInternalError) \ No newline at end of file diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim new file mode 100644 index 0000000..28b63f2 --- /dev/null +++ b/src/reliability_utils.nim @@ -0,0 +1,95 @@ +import std/[times, locks] +import ./[rolling_bloom_filter, message] + +type + PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} + + ReliabilityConfig* = object + bloomFilterCapacity*: int + bloomFilterErrorRate*: float + bloomFilterWindow*: times.Duration + maxMessageHistory*: int + maxCausalHistory*: int + resendInterval*: times.Duration + maxResendAttempts*: int + syncMessageInterval*: times.Duration + bufferSweepInterval*: times.Duration + + ReliabilityManager* = ref object + lamportTimestamp*: int64 + messageHistory*: seq[MessageID] + bloomFilter*: RollingBloomFilter + outgoingBuffer*: seq[UnacknowledgedMessage] + incomingBuffer*: seq[Message] + channelId*: string + config*: ReliabilityConfig + lock*: Lock + onMessageReady*: proc(messageId: MessageID) {.gcsafe.} + onMessageSent*: proc(messageId: MessageID) {.gcsafe.} + onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} + onPeriodicSync*: PeriodicSyncCallback + + ReliabilityError* = enum + reInvalidArgument + reOutOfMemory + reInternalError + reSerializationError + reDeserializationError + reMessageTooLarge + +proc defaultConfig*(): ReliabilityConfig = + ## Creates a default configuration for the ReliabilityManager. + ## + ## Returns: + ## A ReliabilityConfig object with default values. + ReliabilityConfig( + bloomFilterCapacity: DefaultBloomFilterCapacity, + bloomFilterErrorRate: DefaultBloomFilterErrorRate, + bloomFilterWindow: DefaultBloomFilterWindow, + maxMessageHistory: DefaultMaxMessageHistory, + maxCausalHistory: DefaultMaxCausalHistory, + resendInterval: DefaultResendInterval, + maxResendAttempts: DefaultMaxResendAttempts, + syncMessageInterval: DefaultSyncMessageInterval, + bufferSweepInterval: DefaultBufferSweepInterval + ) + +proc cleanup*(rm: ReliabilityManager) {.raises: [].} = + if not rm.isNil: + {.gcsafe.}: + try: + rm.outgoingBuffer.setLen(0) + rm.incomingBuffer.setLen(0) + rm.messageHistory.setLen(0) + except Exception as e: + logError("Error during cleanup: " & e.msg) + +proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = + withLock rm.lock: + try: + rm.bloomFilter.clean() + except Exception as e: + logError("Failed to clean ReliabilityManager bloom filter: " & e.msg) + +proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} = + rm.messageHistory.add(msgId) + if rm.messageHistory.len > rm.config.maxMessageHistory: + rm.messageHistory.delete(0) + +proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) {.gcsafe, raises: [].} = + rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 + +proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] = + result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] + +proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] = + withLock rm.lock: + result = rm.messageHistory + +proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = + withLock rm.lock: + result = rm.outgoingBuffer + +proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] = + withLock rm.lock: + result = rm.incomingBuffer \ No newline at end of file diff --git a/src/utils.nim b/src/rolling_bloom_filter.nim similarity index 68% rename from src/utils.nim rename to src/rolling_bloom_filter.nim index 773d391..78c526b 100644 --- a/src/utils.nim +++ b/src/rolling_bloom_filter.nim @@ -1,7 +1,18 @@ -import std/[times, locks] -import chronos, chronicles -import ./bloom -import ./common +import std/times +import chronos +import chronicles +import ./[bloom, message] + +type + RollingBloomFilter* = object + filter*: BloomFilter + window*: times.Duration + messages*: seq[TimestampedMessageID] + +const + DefaultBloomFilterCapacity* = 10000 + DefaultBloomFilterErrorRate* = 0.001 + DefaultBloomFilterWindow* = initDuration(hours = 1) proc logError*(msg: string) = error "ReliabilityError", message = msg @@ -81,34 +92,4 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = rbf.messages = newMessages rbf.filter = newFilter except Exception as e: - logError("Failed to clean bloom filter: " & e.msg) - -proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = - withLock rm.lock: - try: - rm.bloomFilter.clean() - except Exception as e: - logError("Failed to clean ReliabilityManager bloom filter: " & e.msg) - -proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) = - rm.messageHistory.add(msgId) - if rm.messageHistory.len > rm.config.maxMessageHistory: - rm.messageHistory.delete(0) - -proc updateLamportTimestamp*(rm: ReliabilityManager, msgTs: int64) = - rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 - -proc getRecentMessageIDs*(rm: ReliabilityManager, n: int): seq[MessageID] = - result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] - -proc getMessageHistory*(rm: ReliabilityManager): seq[MessageID] = - withLock rm.lock: - result = rm.messageHistory - -proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = - withLock rm.lock: - result = rm.outgoingBuffer - -proc getIncomingBuffer*(rm: ReliabilityManager): seq[Message] = - withLock rm.lock: - result = rm.incomingBuffer \ No newline at end of file + logError("Failed to clean bloom filter: " & e.msg) \ No newline at end of file diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 091bcd6..42a156e 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1,8 +1,5 @@ import unittest, results, chronos, std/times -import ../src/reliability -import ../src/common -import ../src/protobuf -import ../src/utils +import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter] # Core functionality tests suite "Core Operations": @@ -296,12 +293,14 @@ suite "Periodic Tasks & Buffer Management": finalBuffer.len == 3 # Should have removed acknowledged messages messageSentCount == 3 # Should have triggered sent callback for acknowledged messages - test "periodic buffer sweep": + test "periodic buffer sweep and bloom clean": var messageSentCount = 0 var config = defaultConfig() - config.resendInterval = initDuration(milliseconds = 100) # Very short for testing - config.bufferSweepInterval = initDuration(milliseconds = 50) + config.resendInterval = initDuration(milliseconds = 100) # Short for testing + config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps + config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window + config.maxResendAttempts = 3 # Set a low number of max attempts let rmResultP = newReliabilityManager("testChannel", config) check rmResultP.isOk() @@ -313,27 +312,39 @@ suite "Periodic Tasks & Buffer Management": proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard ) - # Add message to buffer + # First message - should be cleaned from bloom filter later let msg1 = @[byte(1)] let id1 = "msg1" let wrap1 = rm.wrapOutgoingMessage(msg1, id1) check wrap1.isOk() let initialBuffer = rm.getOutgoingBuffer() - check initialBuffer[0].resendAttempts == 0 + check: + initialBuffer[0].resendAttempts == 0 + rm.bloomFilter.contains(id1) rm.startPeriodicTasks() - # Wait long enough for several sweep intervals - waitFor sleepAsync(chronos.milliseconds(300)) + # Wait long enough for bloom filter window to pass and first message to exceed max retries + waitFor sleepAsync(chronos.milliseconds(500)) + + # Add new message + let msg2 = @[byte(2)] + let id2 = "msg2" + let wrap2 = rm.wrapOutgoingMessage(msg2, id2) + check wrap2.isOk() + let finalBuffer = rm.getOutgoingBuffer() check: - finalBuffer.len == 1 - finalBuffer[0].resendAttempts > 0 + finalBuffer.len == 1 # Only msg2 should be in buffer, msg1 should be removed after max retries + finalBuffer[0].message.messageId == id2 # Verify it's the second message + finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts + not rm.bloomFilter.contains(id1) # Bloom filter cleaning check + rm.bloomFilter.contains(id2) # New message still in filter rm.cleanup() - test "periodic sync": + test "periodic sync callback": var syncCallCount = 0 rm.setCallbacks( proc(messageId: MessageID) {.gcsafe.} = discard,