From bb9e89b4a5f3da375bccc6bbe320a1b4690a4774 Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Thu, 30 Jan 2025 22:44:55 +0530 Subject: [PATCH] chore: address review comments --- src/bloom.nim | 2 +- src/message.nim | 4 -- src/private/probabilities.nim | 6 ++- src/protobuf.nim | 29 +++++------ src/reliability_utils.nim | 21 ++++---- src/rolling_bloom_filter.nim | 94 ++++++++++++++++++++--------------- 6 files changed, 85 insertions(+), 71 deletions(-) diff --git a/src/bloom.nim b/src/bloom.nim index 92b0712..7a92335 100644 --- a/src/bloom.nim +++ b/src/bloom.nim @@ -10,7 +10,7 @@ type errorRate*: float kHashes*: int mBits*: int - intArray: seq[int] + intArray*: seq[int] {.push overflowChecks: off.} # Turn off overflow checks for hashing operations diff --git a/src/message.nim b/src/message.nim index e556ec9..d792113 100644 --- a/src/message.nim +++ b/src/message.nim @@ -16,10 +16,6 @@ type sendTime*: Time resendAttempts*: int - TimestampedMessageID* = object - id*: MessageID - timestamp*: Time - const DefaultMaxMessageHistory* = 1000 DefaultMaxCausalHistory* = 10 diff --git a/src/private/probabilities.nim b/src/private/probabilities.nim index 0d2ddc6..6eeb28e 100644 --- a/src/private/probabilities.nim +++ b/src/private/probabilities.nim @@ -9,7 +9,9 @@ type TErrorForK = seq[float] TAllErrorRates* = array[0..12, TErrorForK] -let kErrors*: TAllErrorRates = [ +var kErrors* {.threadvar.}: TAllErrorRates + +kErrors = [ @[1.0], @[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, @@ -95,4 +97,4 @@ let kErrors*: TAllErrorRates = [ 0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, 0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900, 0.0000016500, 0.0000012000, 0.0000008740] -] +] \ No newline at end of file diff --git a/src/protobuf.nim b/src/protobuf.nim index 87e75e9..b929fb2 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -1,5 +1,6 @@ import libp2p/protobuf/minprotobuf import std/options +import endians import ../src/[message, protobufutil, bloom, reliability_utils] proc toBytes(s: string): seq[byte] = @@ -56,17 +57,13 @@ proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] = let pb = encode(msg) ok(pb.buffer) except: - err(reSerializationError) + return err(reSerializationError) proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] = - try: - let msgResult = Message.decode(data) - if msgResult.isOk: - ok(msgResult.get) - else: - err(reSerializationError) - except: - err(reDeserializationError) + let msg = Message.decode(data).valueOr: + return err(ReliabilityError.reDeserializationError) + + ok(msg) proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] = try: @@ -75,8 +72,10 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr # Convert intArray to bytes var bytes = newSeq[byte](filter.intArray.len * sizeof(int)) for i, val in filter.intArray: + var leVal: int + littleEndian64(addr leVal, unsafeAddr val) let start = i * sizeof(int) - copyMem(addr bytes[start], unsafeAddr val, sizeof(int)) + copyMem(addr bytes[start], addr leVal, sizeof(int)) pb.write(1, bytes) pb.write(2, uint64(filter.capacity)) @@ -87,11 +86,11 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr pb.finish() ok(pb.buffer) except: - err(reSerializationError) + return err(ReliabilityError.reSerializationError) proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] = if data.len == 0: - return err(reDeserializationError) + return err(ReliabilityError.reDeserializationError) try: let pb = initProtoBuffer(data) @@ -108,8 +107,10 @@ proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityEr # Convert bytes back to intArray var intArray = newSeq[int](bytes.len div sizeof(int)) for i in 0 ..< intArray.len: + var leVal: int let start = i * sizeof(int) - copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int)) + copyMem(addr leVal, unsafeAddr bytes[start], sizeof(int)) + littleEndian64(addr intArray[i], addr leVal) ok(BloomFilter( intArray: intArray, @@ -119,4 +120,4 @@ proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityEr mBits: int(mBits) )) except: - err(reDeserializationError) \ No newline at end of file + return err(ReliabilityError.reDeserializationError) \ No newline at end of file diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 28b63f2..34b02ff 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -7,7 +7,6 @@ type ReliabilityConfig* = object bloomFilterCapacity*: int bloomFilterErrorRate*: float - bloomFilterWindow*: times.Duration maxMessageHistory*: int maxCausalHistory*: int resendInterval*: times.Duration @@ -29,7 +28,7 @@ type onMissingDependencies*: proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} onPeriodicSync*: PeriodicSyncCallback - ReliabilityError* = enum + ReliabilityError* {.pure.} = enum reInvalidArgument reOutOfMemory reInternalError @@ -45,7 +44,6 @@ proc defaultConfig*(): ReliabilityConfig = ReliabilityConfig( bloomFilterCapacity: DefaultBloomFilterCapacity, bloomFilterErrorRate: DefaultBloomFilterErrorRate, - bloomFilterWindow: DefaultBloomFilterWindow, maxMessageHistory: DefaultMaxMessageHistory, maxCausalHistory: DefaultMaxCausalHistory, resendInterval: DefaultResendInterval, @@ -55,21 +53,24 @@ proc defaultConfig*(): ReliabilityConfig = ) proc cleanup*(rm: ReliabilityManager) {.raises: [].} = - if not rm.isNil: + if not rm.isNil(): {.gcsafe.}: try: - rm.outgoingBuffer.setLen(0) - rm.incomingBuffer.setLen(0) - rm.messageHistory.setLen(0) - except Exception as e: + withLock rm.lock: + rm.outgoingBuffer.setLen(0) + rm.incomingBuffer.setLen(0) + rm.messageHistory.setLen(0) + except ValueError as e: logError("Error during cleanup: " & e.msg) + except Exception: + logError("Error during cleanup: " & getCurrentExceptionMsg()) proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} = withLock rm.lock: try: rm.bloomFilter.clean() - except Exception as e: - logError("Failed to clean ReliabilityManager bloom filter: " & e.msg) + except Exception: + logError("Failed to clean ReliabilityManager bloom filter: " & getCurrentExceptionMsg()) proc addToHistory*(rm: ReliabilityManager, msgId: MessageID) {.gcsafe, raises: [].} = rm.messageHistory.add(msgId) diff --git a/src/rolling_bloom_filter.nim b/src/rolling_bloom_filter.nim index 78c526b..75f5b44 100644 --- a/src/rolling_bloom_filter.nim +++ b/src/rolling_bloom_filter.nim @@ -1,4 +1,3 @@ -import std/times import chronos import chronicles import ./[bloom, message] @@ -6,13 +5,15 @@ import ./[bloom, message] type RollingBloomFilter* = object filter*: BloomFilter - window*: times.Duration - messages*: seq[TimestampedMessageID] + capacity*: int + minCapacity*: int + maxCapacity*: int + messages*: seq[MessageID] const DefaultBloomFilterCapacity* = 10000 DefaultBloomFilterErrorRate* = 0.001 - DefaultBloomFilterWindow* = initDuration(hours = 1) + CapacityFlexPercent* = 20 proc logError*(msg: string) = error "ReliabilityError", message = msg @@ -20,7 +21,7 @@ proc logError*(msg: string) = proc logInfo*(msg: string) = info "ReliabilityInfo", message = msg -proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Duration): RollingBloomFilter {.gcsafe.} = +proc newRollingBloomFilter*(capacity: int, errorRate: float): RollingBloomFilter {.gcsafe.} = try: var filterResult: Result[BloomFilter, string] {.gcsafe.}: @@ -28,29 +29,62 @@ proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Durat if filterResult.isOk: logInfo("Successfully initialized bloom filter") + let targetCapacity = capacity + let minCapacity = (capacity.float * 0.8).int + let maxCapacity = (capacity.float * 1.2).int return RollingBloomFilter( - filter: filterResult.get(), # Extract the BloomFilter from Result - window: window, + filter: filterResult.get(), + capacity: targetCapacity, + minCapacity: minCapacity, + maxCapacity: maxCapacity, messages: @[] ) else: logError("Failed to initialize bloom filter: " & filterResult.error) - # Fall through to default case below - - except: - logError("Failed to initialize bloom filter") + except Exception: + logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg()) + # Default fallback case let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) if defaultResult.isOk: return RollingBloomFilter( filter: defaultResult.get(), - window: window, + capacity: DefaultBloomFilterCapacity, + minCapacity: (DefaultBloomFilterCapacity.float * 0.8).int, + maxCapacity: (DefaultBloomFilterCapacity.float * 1.2).int, messages: @[] ) else: - # If even default initialization fails, raise an exception - logError("Failed to initialize bloom filter with default parameters") + logError("Failed to initialize bloom filter with default parameters: " & defaultResult.error) + +proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = + try: + if rbf.messages.len <= rbf.maxCapacity: + return # Don't clean unless we exceed max capacity + + # Initialize new filter + let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate) + if newFilterResult.isErr: + logError("Failed to create new bloom filter: " & newFilterResult.error) + return + + var newFilter = newFilterResult.get() + + # Keep most recent messages up to minCapacity + let keepCount = rbf.minCapacity + let startIdx = max(0, rbf.messages.len - keepCount) + var newMessages: seq[MessageID] = @[] + + for i in startIdx ..< rbf.messages.len: + newMessages.add(rbf.messages[i]) + newFilter.insert(rbf.messages[i]) + + rbf.messages = newMessages + rbf.filter = newFilter + + except Exception: + logError("Failed to clean bloom filter: " & getCurrentExceptionMsg()) proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} = ## Adds a message ID to the rolling bloom filter. @@ -58,7 +92,11 @@ proc add*(rbf: var RollingBloomFilter, messageId: MessageID) {.gcsafe.} = ## Parameters: ## - messageId: The ID of the message to add. rbf.filter.insert(messageId) - rbf.messages.add(TimestampedMessageID(id: messageId, timestamp: getTime())) + rbf.messages.add(messageId) + + # Clean if we exceed max capacity + if rbf.messages.len > rbf.maxCapacity: + rbf.clean() proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} = ## Checks if a message ID is in the rolling bloom filter. @@ -68,28 +106,4 @@ proc contains*(rbf: RollingBloomFilter, messageId: MessageID): bool {.gcsafe.} = ## ## Returns: ## True if the message ID is probably in the filter, false otherwise. - rbf.filter.lookup(messageId) - -proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = - try: - let now = getTime() - let cutoff = now - rbf.window - var newMessages: seq[TimestampedMessageID] = @[] - - # Initialize new filter - let newFilterResult = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate) - if newFilterResult.isErr: - logError("Failed to create new bloom filter: " & newFilterResult.error) - return - - var newFilter = newFilterResult.get() - - for msg in rbf.messages: - if msg.timestamp > cutoff: - newMessages.add(msg) - newFilter.insert(msg.id) - - rbf.messages = newMessages - rbf.filter = newFilter - except Exception as e: - logError("Failed to clean bloom filter: " & e.msg) \ No newline at end of file + rbf.filter.lookup(messageId) \ No newline at end of file