diff --git a/.gitignore b/.gitignore index cfc9510..1431936 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ nimcache nimcache/* -tests/bloom +tests/test_bloom nim-bloom/bloom .DS_Store -src/.DS_Store \ No newline at end of file +src/.DS_Store +nph \ No newline at end of file diff --git a/reliability.nimble b/reliability.nimble new file mode 100644 index 0000000..8bc19c8 --- /dev/null +++ b/reliability.nimble @@ -0,0 +1,16 @@ +# Package +version = "0.1.0" +author = "Waku Team" +description = "E2E Reliability Protocol API" +license = "MIT" +srcDir = "src" + +# Dependencies +requires "nim >= 2.0.8" +requires "chronicles" +requires "libp2p" + +# Tasks +task test, "Run the test suite": + exec "nim c -r tests/test_bloom.nim" + exec "nim c -r tests/test_reliability.nim" diff --git a/src/bloom.nim b/src/bloom.nim index 92b0712..ea3b703 100644 --- a/src/bloom.nim +++ b/src/bloom.nim @@ -4,22 +4,21 @@ import strutils import results import private/probabilities -type - BloomFilter* = object - capacity*: int - errorRate*: float - kHashes*: int - mBits*: int - intArray: seq[int] +type BloomFilter* = object + capacity*: int + errorRate*: float + kHashes*: int + mBits*: int + intArray*: seq[int] -{.push overflowChecks: off.} # Turn off overflow checks for hashing operations +{.push overflowChecks: off.} # Turn off overflow checks for hashing operations proc hashN(item: string, n: int, maxValue: int): int = ## Get the nth hash using Nim's built-in hash function using ## the double hashing technique from Kirsch and Mitzenmacher, 2008: ## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf let - hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes + hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes hashB = abs(hash(item & " b")) mod maxValue # string concatenation abs((hashA + n * hashB)) mod maxValue # # Use bit rotation for second hash instead of string concatenation if speed if preferred over FP-rate @@ -31,20 +30,24 @@ proc hashN(item: string, n: int, maxValue: int): int = {.pop.} -proc getMOverNBitsForK*(k: int, targetError: float, - probabilityTable = kErrors): Result[int, string] = +proc getMOverNBitsForK*( + k: int, targetError: float, probabilityTable = kErrors +): Result[int, string] = ## Returns the optimal number of m/n bits for a given k. - if k notin 0..12: + if k notin 0 .. 12: return err("K must be <= 12 if forceNBitsPerElem is not also specified.") - for mOverN in 2..probabilityTable[k].high: + for mOverN in 2 .. probabilityTable[k].high: if probabilityTable[k][mOverN] < targetError: return ok(mOverN) - err("Specified value of k and error rate not achievable using less than 4 bytes / element.") + err( + "Specified value of k and error rate not achievable using less than 4 bytes / element." + ) -proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, - forceNBitsPerElem = 0): Result[BloomFilter, string] = +proc initializeBloomFilter*( + capacity: int, errorRate: float, k = 0, forceNBitsPerElem = 0 +): Result[BloomFilter, string] = ## Initializes a Bloom filter with specified parameters. ## ## Parameters: @@ -76,25 +79,29 @@ proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0, mBits = capacity * nBitsPerElem mInts = 1 + mBits div (sizeof(int) * 8) - ok(BloomFilter( - capacity: capacity, - errorRate: errorRate, - kHashes: kHashes, - mBits: mBits, - intArray: newSeq[int](mInts) - )) + ok( + BloomFilter( + capacity: capacity, + errorRate: errorRate, + kHashes: kHashes, + mBits: mBits, + intArray: newSeq[int](mInts), + ) + ) proc `$`*(bf: BloomFilter): string = ## Prints the configuration of the Bloom filter. "Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." % - [$bf.capacity, - formatFloat(bf.errorRate, format = ffScientific, precision = 1), - $bf.kHashes, - $(bf.mBits div bf.capacity)] + [ + $bf.capacity, + formatFloat(bf.errorRate, format = ffScientific, precision = 1), + $bf.kHashes, + $(bf.mBits div bf.capacity), + ] proc computeHashes(bf: BloomFilter, item: string): seq[int] = var hashes = newSeq[int](bf.kHashes) - for i in 0.. rm.config.maxMessageHistory: + rm.messageHistory.delete(0) + +proc updateLamportTimestamp*( + rm: ReliabilityManager, msgTs: int64 +) {.gcsafe, raises: [].} = + rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1 + +proc getRecentSdsMessageIDs*(rm: ReliabilityManager, n: int): seq[SdsMessageID] = + result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1] + +proc getMessageHistory*(rm: ReliabilityManager): seq[SdsMessageID] = + withLock rm.lock: + result = rm.messageHistory + +proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] = + withLock rm.lock: + result = rm.outgoingBuffer + +proc getIncomingBuffer*(rm: ReliabilityManager): seq[SdsMessage] = + withLock rm.lock: + result = rm.incomingBuffer diff --git a/src/rolling_bloom_filter.nim b/src/rolling_bloom_filter.nim new file mode 100644 index 0000000..190ab8a --- /dev/null +++ b/src/rolling_bloom_filter.nim @@ -0,0 +1,118 @@ +import chronos +import chronicles +import ./[bloom, message] + +type RollingBloomFilter* = object + filter*: BloomFilter + capacity*: int + minCapacity*: int + maxCapacity*: int + messages*: seq[SdsMessageID] + +const + DefaultBloomFilterCapacity* = 10000 + DefaultBloomFilterErrorRate* = 0.001 + CapacityFlexPercent* = 20 + +proc newRollingBloomFilter*( + capacity: int = DefaultBloomFilterCapacity, + errorRate: float = DefaultBloomFilterErrorRate, +): RollingBloomFilter {.gcsafe.} = + let targetCapacity = if capacity <= 0: DefaultBloomFilterCapacity else: capacity + let targetError = + if errorRate <= 0.0 or errorRate >= 1.0: DefaultBloomFilterErrorRate else: errorRate + + let filterResult = initializeBloomFilter(targetCapacity, targetError) + if filterResult.isErr: + error "Failed to initialize bloom filter", error = filterResult.error + # Try with default values if custom values failed + if capacity != DefaultBloomFilterCapacity or errorRate != DefaultBloomFilterErrorRate: + let defaultResult = + initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) + if defaultResult.isErr: + error "Failed to initialize bloom filter with default parameters", + error = defaultResult.error + + let minCapacity = ( + DefaultBloomFilterCapacity.float * (100 - CapacityFlexPercent).float / 100.0 + ).int + let maxCapacity = ( + DefaultBloomFilterCapacity.float * (100 + CapacityFlexPercent).float / 100.0 + ).int + + info "Successfully initialized bloom filter with default parameters", + capacity = DefaultBloomFilterCapacity, + minCapacity = minCapacity, + maxCapacity = maxCapacity + + return RollingBloomFilter( + filter: defaultResult.get(), + capacity: DefaultBloomFilterCapacity, + minCapacity: minCapacity, + maxCapacity: maxCapacity, + messages: @[], + ) + else: + error "Could not create bloom filter", error = filterResult.error + + let minCapacity = + (targetCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int + let maxCapacity = + (targetCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int + + info "Successfully initialized bloom filter", + capacity = targetCapacity, minCapacity = minCapacity, maxCapacity = maxCapacity + + return RollingBloomFilter( + filter: filterResult.get(), + capacity: targetCapacity, + minCapacity: minCapacity, + maxCapacity: maxCapacity, + messages: @[], + ) + +proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} = + try: + if rbf.messages.len <= rbf.maxCapacity: + return # Don't clean unless we exceed max capacity + + # Initialize new filter + var newFilter = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate).valueOr: + error "Failed to create new bloom filter", error = $error + return + + # Keep most recent messages up to minCapacity + let keepCount = rbf.minCapacity + let startIdx = max(0, rbf.messages.len - keepCount) + var newMessages: seq[SdsMessageID] = @[] + + for i in startIdx ..< rbf.messages.len: + newMessages.add(rbf.messages[i]) + newFilter.insert(cast[string](rbf.messages[i])) + + rbf.messages = newMessages + rbf.filter = newFilter + except Exception: + error "Failed to clean bloom filter", error = getCurrentExceptionMsg() + +proc add*(rbf: var RollingBloomFilter, messageId: SdsMessageID) {.gcsafe.} = + ## Adds a message ID to the rolling bloom filter. + ## + ## Parameters: + ## - messageId: The ID of the message to add. + rbf.filter.insert(cast[string](messageId)) + rbf.messages.add(messageId) + + # Clean if we exceed max capacity + if rbf.messages.len > rbf.maxCapacity: + rbf.clean() + +proc contains*(rbf: RollingBloomFilter, messageId: SdsMessageID): bool = + ## Checks if a message ID is in the rolling bloom filter. + ## + ## Parameters: + ## - messageId: The ID of the message to check. + ## + ## Returns: + ## True if the message ID is probably in the filter, false otherwise. + rbf.filter.lookup(cast[string](messageId)) diff --git a/tests/test_bloom.nim b/tests/test_bloom.nim index 7da555c..540735d 100644 --- a/tests/test_bloom.nim +++ b/tests/test_bloom.nim @@ -1,6 +1,7 @@ import unittest, results, strutils import ../src/bloom from random import rand, randomize +import ../src/[message, protobuf, protobufutil, reliability_utils, rolling_bloom_filter] suite "bloom filter": setup: @@ -13,9 +14,9 @@ suite "bloom filter": sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" testElements = newSeq[string](nElementsToTest) - for i in 0.. 12 let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01) check errorCase.isErr - check errorCase.error == "K must be <= 12 if forceNBitsPerElem is not also specified." + check errorCase.error == + "K must be <= 12 if forceNBitsPerElem is not also specified." # Test error case for unachievable error rate let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001) check errorCase2.isErr - check errorCase2.error == "Specified value of k and error rate not achievable using less than 4 bytes / element." + check errorCase2.error == + "Specified value of k and error rate not achievable using less than 4 bytes / element." # Test success cases let case1 = getMOverNBitsForK(k = 2, targetError = 0.1) @@ -93,50 +96,51 @@ suite "bloom filter": check bf3Result.isOk let bf3 = bf3Result.get let str = $bf3 - check str.contains("1000") # Capacity - check str.contains("4 hash") # Hash functions - check str.contains("1.0e-02") # Error rate in scientific notation + check str.contains("1000") # Capacity + check str.contains("4 hash") # Hash functions + check str.contains("1.0e-02") # Error rate in scientific notation suite "bloom filter special cases": test "different patterns of strings": const testSize = 10_000 - let patterns = @[ - "shortstr", - repeat("a", 1000), # Very long string - "special@#$%^&*()", # Special characters - "unicode→★∑≈", # Unicode characters - repeat("pattern", 10) # Repeating pattern - ] - + let patterns = + @[ + "shortstr", + repeat("a", 1000), # Very long string + "special@#$%^&*()", # Special characters + "unicode→★∑≈", # Unicode characters + repeat("pattern", 10), # Repeating pattern + ] + let bfResult = initializeBloomFilter(testSize, 0.01) check bfResult.isOk var bf = bfResult.get var inserted = newSeq[string](testSize) - + # Test pattern handling for pattern in patterns: bf.insert(pattern) assert bf.lookup(pattern), "failed lookup pattern: " & pattern - + # Test general insertion and lookup - for i in 0..