From 32146675f0a9d38214d69d5292b179a00db289e8 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Wed, 28 May 2025 17:56:13 +0100 Subject: [PATCH] fix: fix nph formatting --- src/bloom.nim | 65 ++++++------ src/message.nim | 2 +- src/private/probabilities.nim | 173 ++++++++++++++++--------------- src/protobuf.nim | 52 +++++----- src/protobufutil.nim | 2 +- src/rolling_bloom_filter.nim | 37 ++++--- tests/test_bloom.nim | 63 ++++++------ tests/test_reliability.nim | 188 +++++++++++++++++++--------------- 8 files changed, 309 insertions(+), 273 deletions(-) diff --git a/src/bloom.nim b/src/bloom.nim index 7a92335..ea3b703 100644 --- a/src/bloom.nim +++ b/src/bloom.nim @@ -4,22 +4,21 @@ import strutils import results import private/probabilities -type - BloomFilter* = object - capacity*: int - errorRate*: float - kHashes*: int - mBits*: int - intArray*: seq[int] +type BloomFilter* = object + capacity*: int + errorRate*: float + kHashes*: int + mBits*: int + intArray*: seq[int] -{.push overflowChecks: off.} # Turn off overflow checks for hashing operations +{.push overflowChecks: off.} # Turn off overflow checks for hashing operations proc hashN(item: string, n: int, maxValue: int): int = ## Get the nth hash using Nim's built-in hash function using ## the double hashing technique from Kirsch and Mitzenmacher, 2008: ## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf let - hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes + hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes hashB = abs(hash(item & " b")) mod maxValue # string concatenation abs((hashA + n * hashB)) mod maxValue # # Use bit rotation for second hash instead of string concatenation if speed if preferred over FP-rate @@ -31,20 +30,24 @@ proc hashN(item: string, n: int, maxValue: int): int = {.pop.} -proc getMOverNBitsForK*(k: int, targetError: float, - probabilityTable = kErrors): Result[int, string] = +proc getMOverNBitsForK*( + k: int, targetError: float, probabilityTable = kErrors +): Result[int, string] = ## Returns the optimal number of m/n bits for a given k. - if k notin 0..12: + if k notin 0 .. 12: return err("K must be <= 12 if forceNBitsPerElem is not also specified.") - for mOverN in 2..probabilityTable[k].high: + for mOverN in 2 .. probabilityTable[k].high: if probabilityTable[k][mOverN] < targetError: return ok(mOverN) - err("Specified value of k and error rate not achievable using less than 4 bytes / element.") + err( + "Specified value of k and error rate not achievable using less than 4 bytes / element." + ) -proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, - forceNBitsPerElem = 0): Result[BloomFilter, string] = +proc initializeBloomFilter*( + capacity: int, errorRate: float, k = 0, forceNBitsPerElem = 0 +): Result[BloomFilter, string] = ## Initializes a Bloom filter with specified parameters. ## ## Parameters: @@ -76,25 +79,29 @@ proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, mBits = capacity * nBitsPerElem mInts = 1 + mBits div (sizeof(int) * 8) - ok(BloomFilter( - capacity: capacity, - errorRate: errorRate, - kHashes: kHashes, - mBits: mBits, - intArray: newSeq[int](mInts) - )) + ok( + BloomFilter( + capacity: capacity, + errorRate: errorRate, + kHashes: kHashes, + mBits: mBits, + intArray: newSeq[int](mInts), + ) + ) proc `$`*(bf: BloomFilter): string = ## Prints the configuration of the Bloom filter. "Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." % - [$bf.capacity, - formatFloat(bf.errorRate, format = ffScientific, precision = 1), - $bf.kHashes, - $(bf.mBits div bf.capacity)] + [ + $bf.capacity, + formatFloat(bf.errorRate, format = ffScientific, precision = 1), + $bf.kHashes, + $(bf.mBits div bf.capacity), + ] proc computeHashes(bf: BloomFilter, item: string): seq[int] = var hashes = newSeq[int](bf.kHashes) - for i in 0.. 12 let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01) check errorCase.isErr - check errorCase.error == "K must be <= 12 if forceNBitsPerElem is not also specified." + check errorCase.error == + "K must be <= 12 if forceNBitsPerElem is not also specified." # Test error case for unachievable error rate let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001) check errorCase2.isErr - check errorCase2.error == "Specified value of k and error rate not achievable using less than 4 bytes / element." + check errorCase2.error == + "Specified value of k and error rate not achievable using less than 4 bytes / element." # Test success cases let case1 = getMOverNBitsForK(k = 2, targetError = 0.1) @@ -93,50 +95,51 @@ suite "bloom filter": check bf3Result.isOk let bf3 = bf3Result.get let str = $bf3 - check str.contains("1000") # Capacity - check str.contains("4 hash") # Hash functions - check str.contains("1.0e-02") # Error rate in scientific notation + check str.contains("1000") # Capacity + check str.contains("4 hash") # Hash functions + check str.contains("1.0e-02") # Error rate in scientific notation suite "bloom filter special cases": test "different patterns of strings": const testSize = 10_000 - let patterns = @[ - "shortstr", - repeat("a", 1000), # Very long string - "special@#$%^&*()", # Special characters - "unicode→★∑≈", # Unicode characters - repeat("pattern", 10) # Repeating pattern - ] - + let patterns = + @[ + "shortstr", + repeat("a", 1000), # Very long string + "special@#$%^&*()", # Special characters + "unicode→★∑≈", # Unicode characters + repeat("pattern", 10), # Repeating pattern + ] + let bfResult = initializeBloomFilter(testSize, 0.01) check bfResult.isOk var bf = bfResult.get var inserted = newSeq[string](testSize) - + # Test pattern handling for pattern in patterns: bf.insert(pattern) assert bf.lookup(pattern), "failed lookup pattern: " & pattern - + # Test general insertion and lookup - for i in 0.. msg2 -> msg1 @@ -105,19 +108,19 @@ suite "Reliability Mechanisms": let msg2 = Message( messageId: id2, lamportTimestamp: 2, - causalHistory: @[id1], # msg2 depends on msg1 + causalHistory: @[id1], # msg2 depends on msg1 channelId: "testChannel", content: @[byte(2)], - bloomFilter: @[] + bloomFilter: @[], ) let msg3 = Message( messageId: id3, lamportTimestamp: 3, - causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 + causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 channelId: "testChannel", content: @[byte(3)], - bloomFilter: @[] + bloomFilter: @[], ) let serialized2 = serializeMessage(msg2) @@ -130,10 +133,10 @@ suite "Reliability Mechanisms": let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get()) check unwrapResult3.isOk() let (_, missingDeps3) = unwrapResult3.get() - + check: - missingDepsCount == 1 # Should trigger missing deps callback - missingDeps3.len == 2 # Should be missing both msg1 and msg2 + missingDepsCount == 1 # Should trigger missing deps callback + missingDeps3.len == 2 # Should be missing both msg1 and msg2 id1 in missingDeps3 id2 in missingDeps3 @@ -141,12 +144,12 @@ suite "Reliability Mechanisms": let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get()) check unwrapResult2.isOk() let (_, missingDeps2) = unwrapResult2.get() - + check: - missingDepsCount == 2 # Should have triggered another missing deps callback - missingDeps2.len == 1 # Should only be missing msg1 + missingDepsCount == 2 # Should have triggered another missing deps callback + missingDeps2.len == 1 # Should only be missing msg1 id1 in missingDeps2 - messageReadyCount == 0 # No messages should be ready yet + messageReadyCount == 0 # No messages should be ready yet # Mark first dependency (msg1) as met let markResult1 = rm.markDependenciesMet(@[id1]) @@ -156,8 +159,8 @@ suite "Reliability Mechanisms": check: incomingBuffer.len == 0 - messageReadyCount == 2 # Both msg2 and msg3 should be ready - missingDepsCount == 2 # Should still be 2 from the initial missing deps + messageReadyCount == 2 # Both msg2 and msg3 should be ready + missingDepsCount == 2 # Should still be 2 from the initial missing deps test "acknowledgment via causal history": var messageReadyCount = 0 @@ -165,9 +168,12 @@ suite "Reliability Mechanisms": var missingDepsCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = missingDepsCount += 1 + proc(messageId: MessageID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: MessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + missingDepsCount += 1, ) # Send our message @@ -180,30 +186,34 @@ suite "Reliability Mechanisms": let msg2 = Message( messageId: "msg2", lamportTimestamp: rm.lamportTimestamp + 1, - causalHistory: @[id1], # Include our message in causal history + causalHistory: @[id1], # Include our message in causal history channelId: "testChannel", content: @[byte(2)], - bloomFilter: @[] # Test with an empty bloom filter + bloomFilter: @[] # Test with an empty bloom filter + , ) - + let serializedMsg2 = serializeMessage(msg2) check serializedMsg2.isOk() # Process the "received" message - should trigger callbacks let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) check unwrapResult.isOk() - + check: - messageReadyCount == 1 # For msg2 which we "received" - messageSentCount == 1 # For msg1 which was acknowledged via causal history + messageReadyCount == 1 # For msg2 which we "received" + messageSentCount == 1 # For msg1 which was acknowledged via causal history test "acknowledgment via bloom filter": var messageSentCount = 0 - + rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + discard, ) # Send our message @@ -214,22 +224,20 @@ suite "Reliability Mechanisms": # Create a message with bloom filter containing our message var otherPartyBloomFilter = newRollingBloomFilter( - DefaultBloomFilterCapacity, - DefaultBloomFilterErrorRate, - DefaultBloomFilterWindow + DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate, DefaultBloomFilterWindow ) otherPartyBloomFilter.add(id1) - + let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter) check bfResult.isOk() let msg2 = Message( messageId: "msg2", lamportTimestamp: rm.lamportTimestamp + 1, - causalHistory: @[], # Empty causal history as we're using bloom filter + causalHistory: @[], # Empty causal history as we're using bloom filter channelId: "testChannel", content: @[byte(2)], - bloomFilter: bfResult.get() + bloomFilter: bfResult.get(), ) let serializedMsg2 = serializeMessage(msg2) @@ -237,8 +245,8 @@ suite "Reliability Mechanisms": let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) check unwrapResult.isOk() - - check messageSentCount == 1 # Our message should be acknowledged via bloom filter + + check messageSentCount == 1 # Our message should be acknowledged via bloom filter # Periodic task & Buffer management tests suite "Periodic Tasks & Buffer Management": @@ -255,15 +263,18 @@ suite "Periodic Tasks & Buffer Management": test "outgoing buffer management": var messageSentCount = 0 - + rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + discard, ) # Add multiple messages - for i in 0..5: + for i in 0 .. 5: let msg = @[byte(i)] let id = "msg" & $i let wrap = rm.wrapOutgoingMessage(msg, id) @@ -279,37 +290,41 @@ suite "Periodic Tasks & Buffer Management": causalHistory: @["msg0", "msg2", "msg4"], channelId: "testChannel", content: @[byte(100)], - bloomFilter: @[] + bloomFilter: @[], ) - + let serializedAck = serializeMessage(ackMsg) check serializedAck.isOk() - + # Process the acknowledgment discard rm.unwrapReceivedMessage(serializedAck.get()) - + let finalBuffer = rm.getOutgoingBuffer() check: - finalBuffer.len == 3 # Should have removed acknowledged messages - messageSentCount == 3 # Should have triggered sent callback for acknowledged messages + finalBuffer.len == 3 # Should have removed acknowledged messages + messageSentCount == 3 + # Should have triggered sent callback for acknowledged messages test "periodic buffer sweep and bloom clean": var messageSentCount = 0 - + var config = defaultConfig() - config.resendInterval = initDuration(milliseconds = 100) # Short for testing - config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps - config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window + config.resendInterval = initDuration(milliseconds = 100) # Short for testing + config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps + config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window config.maxResendAttempts = 3 # Set a low number of max attempts - + let rmResultP = newReliabilityManager("testChannel", config) check rmResultP.isOk() let rm = rmResultP.get() - + rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + discard, ) # First message - should be cleaned from bloom filter later @@ -324,10 +339,10 @@ suite "Periodic Tasks & Buffer Management": rm.bloomFilter.contains(id1) rm.startPeriodicTasks() - + # Wait long enough for bloom filter window to pass and first message to exceed max retries waitFor sleepAsync(chronos.milliseconds(500)) - + # Add new message let msg2 = @[byte(2)] let id2 = "msg2" @@ -336,27 +351,32 @@ suite "Periodic Tasks & Buffer Management": let finalBuffer = rm.getOutgoingBuffer() check: - finalBuffer.len == 1 # Only msg2 should be in buffer, msg1 should be removed after max retries + finalBuffer.len == 1 + # Only msg2 should be in buffer, msg1 should be removed after max retries finalBuffer[0].message.messageId == id2 # Verify it's the second message - finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts - not rm.bloomFilter.contains(id1) # Bloom filter cleaning check - rm.bloomFilter.contains(id2) # New message still in filter + finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts + not rm.bloomFilter.contains(id1) # Bloom filter cleaning check + rm.bloomFilter.contains(id2) # New message still in filter rm.cleanup() test "periodic sync callback": var syncCallCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard, - proc() {.gcsafe.} = syncCallCount += 1 + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + discard, + proc() {.gcsafe.} = + syncCallCount += 1, ) rm.startPeriodicTasks() waitFor sleepAsync(chronos.seconds(1)) rm.cleanup() - + check syncCallCount > 0 # Special cases handling @@ -374,12 +394,12 @@ suite "Special Cases Handling": test "message history limits": # Add messages up to max history size - for i in 0..rm.config.maxMessageHistory + 5: + for i in 0 .. rm.config.maxMessageHistory + 5: let msg = @[byte(i)] let id = "msg" & $i let wrap = rm.wrapOutgoingMessage(msg, id) check wrap.isOk() - + let history = rm.getMessageHistory() check: history.len <= rm.config.maxMessageHistory @@ -392,7 +412,8 @@ suite "Special Cases Handling": causalHistory: @[], channelId: "testChannel", content: @[byte(1)], - bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data + bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data + , ) let serializedInvalid = serializeMessage(msgInvalid) @@ -402,14 +423,17 @@ suite "Special Cases Handling": let result = rm.unwrapReceivedMessage(serializedInvalid.get()) check: result.isOk() - result.get()[1].len == 0 # No missing dependencies + result.get()[1].len == 0 # No missing dependencies test "duplicate message handling": var messageReadyCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: MessageID) {.gcsafe.} = discard, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard + proc(messageId: MessageID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: MessageID) {.gcsafe.} = + discard, + proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + discard, ) # Create and process a message @@ -419,7 +443,7 @@ suite "Special Cases Handling": causalHistory: @[], channelId: "testChannel", content: @[byte(1)], - bloomFilter: @[] + bloomFilter: @[], ) let serialized = serializeMessage(msg) @@ -431,8 +455,8 @@ suite "Special Cases Handling": let result2 = rm.unwrapReceivedMessage(serialized.get()) check: result2.isOk() - result2.get()[1].len == 0 # No missing deps on second process - messageReadyCount == 1 # Message should only be processed once + result2.get()[1].len == 0 # No missing deps on second process + messageReadyCount == 1 # Message should only be processed once test "error handling": # Empty message @@ -466,4 +490,4 @@ suite "cleanup": let history = rm.getMessageHistory() check: outBuffer.len == 0 - history.len == 0 \ No newline at end of file + history.len == 0