fix: fix nph formatting

This commit is contained in:
jm-clius 2025-05-28 17:56:13 +01:00
parent 69a1872a2e
commit 32146675f0
No known key found for this signature in database
GPG Key ID: 5FCD9D5211B952DA
8 changed files with 309 additions and 273 deletions

View File

@ -4,22 +4,21 @@ import strutils
import results
import private/probabilities
type
BloomFilter* = object
capacity*: int
errorRate*: float
kHashes*: int
mBits*: int
intArray*: seq[int]
type BloomFilter* = object
capacity*: int
errorRate*: float
kHashes*: int
mBits*: int
intArray*: seq[int]
{.push overflowChecks: off.} # Turn off overflow checks for hashing operations
{.push overflowChecks: off.} # Turn off overflow checks for hashing operations
proc hashN(item: string, n: int, maxValue: int): int =
## Get the nth hash using Nim's built-in hash function using
## the double hashing technique from Kirsch and Mitzenmacher, 2008:
## http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/rsa.pdf
let
hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes
hashA = abs(hash(item)) mod maxValue # Use abs to handle negative hashes
hashB = abs(hash(item & " b")) mod maxValue # string concatenation
abs((hashA + n * hashB)) mod maxValue
# # Use bit rotation for second hash instead of string concatenation if speed if preferred over FP-rate
@ -31,20 +30,24 @@ proc hashN(item: string, n: int, maxValue: int): int =
{.pop.}
proc getMOverNBitsForK*(k: int, targetError: float,
probabilityTable = kErrors): Result[int, string] =
proc getMOverNBitsForK*(
k: int, targetError: float, probabilityTable = kErrors
): Result[int, string] =
## Returns the optimal number of m/n bits for a given k.
if k notin 0..12:
if k notin 0 .. 12:
return err("K must be <= 12 if forceNBitsPerElem is not also specified.")
for mOverN in 2..probabilityTable[k].high:
for mOverN in 2 .. probabilityTable[k].high:
if probabilityTable[k][mOverN] < targetError:
return ok(mOverN)
err("Specified value of k and error rate not achievable using less than 4 bytes / element.")
err(
"Specified value of k and error rate not achievable using less than 4 bytes / element."
)
proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
forceNBitsPerElem = 0): Result[BloomFilter, string] =
proc initializeBloomFilter*(
capacity: int, errorRate: float, k = 0, forceNBitsPerElem = 0
): Result[BloomFilter, string] =
## Initializes a Bloom filter with specified parameters.
##
## Parameters:
@ -76,25 +79,29 @@ proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
mBits = capacity * nBitsPerElem
mInts = 1 + mBits div (sizeof(int) * 8)
ok(BloomFilter(
capacity: capacity,
errorRate: errorRate,
kHashes: kHashes,
mBits: mBits,
intArray: newSeq[int](mInts)
))
ok(
BloomFilter(
capacity: capacity,
errorRate: errorRate,
kHashes: kHashes,
mBits: mBits,
intArray: newSeq[int](mInts),
)
)
proc `$`*(bf: BloomFilter): string =
## Prints the configuration of the Bloom filter.
"Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." %
[$bf.capacity,
formatFloat(bf.errorRate, format = ffScientific, precision = 1),
$bf.kHashes,
$(bf.mBits div bf.capacity)]
[
$bf.capacity,
formatFloat(bf.errorRate, format = ffScientific, precision = 1),
$bf.kHashes,
$(bf.mBits div bf.capacity),
]
proc computeHashes(bf: BloomFilter, item: string): seq[int] =
var hashes = newSeq[int](bf.kHashes)
for i in 0..<bf.kHashes:
for i in 0 ..< bf.kHashes:
hashes[i] = hashN(item, i, bf.mBits)
hashes
@ -120,4 +127,4 @@ proc lookup*(bf: BloomFilter, item: string): bool =
currentInt = bf.intArray[intAddress]
if currentInt != (currentInt or (1 shl bitOffset)):
return false
true
true

View File

@ -27,4 +27,4 @@ const
DefaultMaxResendAttempts* = 5
DefaultSyncMessageInterval* = initDuration(seconds = 30)
DefaultBufferSweepInterval* = initDuration(seconds = 60)
MaxMessageSize* = 1024 * 1024 # 1 MB
MaxMessageSize* = 1024 * 1024 # 1 MB

View File

@ -7,94 +7,97 @@
type
TErrorForK = seq[float]
TAllErrorRates* = array[0..12, TErrorForK]
TAllErrorRates* = array[0 .. 12, TErrorForK]
var kErrors* {.threadvar.}: TAllErrorRates
kErrors = [
@[1.0],
@[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000,
0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000,
0.0869000000, 0.0800000000, 0.0740000000, 0.0689000000, 0.0645000000,
0.0606000000, 0.0571000000, 0.0540000000, 0.0513000000, 0.0488000000,
0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000, 0.0392000000,
0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000,
0.0317000000, 0.0308000000],
@[1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000,
0.0804000000, 0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000,
0.0276000000, 0.0236000000, 0.0203000000, 0.0177000000, 0.0156000000,
0.0138000000, 0.0123000000, 0.0111000000, 0.0099800000, 0.0090600000,
0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000, 0.0059100000,
0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000,
0.0039000000, 0.0036700000],
@[1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000,
0.0423000000, 0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000,
0.0108000000, 0.0087500000, 0.0071800000, 0.0059600000, 0.0050000000,
0.0042300000, 0.0036200000, 0.0031200000, 0.0027000000, 0.0023600000,
0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000, 0.0012900000,
0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000,
0.0007170000],
@[1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000,
0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000,
0.0049200000, 0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000,
0.0015800000, 0.0013000000, 0.0010800000, 0.0009050000, 0.0007640000,
0.0006490000, 0.0005550000, 0.0004780000, 0.0004130000, 0.0003590000,
0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000, 0.0001910000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000,
0.0217000000, 0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000,
0.0033200000, 0.0024400000, 0.0018300000, 0.0013900000, 0.0010700000,
0.0008390000, 0.0006630000, 0.0005300000, 0.0004270000, 0.0003470000,
0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000, 0.0001380000,
0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000,
0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000,
0.0017900000, 0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000,
0.0003940000, 0.0003030000, 0.0002360000, 0.0001850000, 0.0001470000,
0.0001170000, 0.0000944000, 0.0000766000, 0.0000626000, 0.0000515000,
0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000],
@[1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000,
0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000,
0.0007020000, 0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000,
0.0001470000, 0.0001120000, 0.0000856000, 0.0000663000, 0.0000518000,
0.0000408000, 0.0000324000, 0.0000259000, 0.0000209000, 0.0000169000,
0.0000138000, 0.0000113000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 0.0145000000, 0.0084600000, 0.0050900000, 0.0031400000, 0.0019900000,
0.0012900000, 0.0008520000, 0.0005740000, 0.0003940000, 0.0002750000,
0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000, 0.0000555000,
0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300],
@[1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000,
0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000,
0.0002260000, 0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000,
0.0000392000, 0.0000286000, 0.0000211000, 0.0000157000, 0.0000118000,
0.0000089600, 0.0000068500, 0.0000052800, 0.0000041000, 0.0000032000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000,
0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000,
0.0001980000, 0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000,
0.0000297000, 0.0000211000, 0.0000152000, 0.0000110000, 0.0000080700,
0.0000059700, 0.0000044500, 0.0000033500, 0.0000025400, 0.0000019400],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0021000000, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000,
0.0001830000, 0.0001180000, 0.0000777000, 0.0000518000, 0.0000350000,
0.0000240000, 0.0000166000, 0.0000116000, 0.0000082300, 0.0000058900,
0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900, 0.0000012600],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000,
0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000,
0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900,
0.0000016500, 0.0000012000, 0.0000008740]
@[
1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000,
0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.0869000000, 0.0800000000,
0.0740000000, 0.0689000000, 0.0645000000, 0.0606000000, 0.0571000000, 0.0540000000,
0.0513000000, 0.0488000000, 0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000,
0.0392000000, 0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000,
0.0317000000, 0.0308000000,
],
@[
1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, 0.0804000000,
0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 0.0276000000, 0.0236000000,
0.0203000000, 0.0177000000, 0.0156000000, 0.0138000000, 0.0123000000, 0.0111000000,
0.0099800000, 0.0090600000, 0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000,
0.0059100000, 0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000,
0.0039000000, 0.0036700000,
],
@[
1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, 0.0423000000,
0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 0.0108000000, 0.0087500000,
0.0071800000, 0.0059600000, 0.0050000000, 0.0042300000, 0.0036200000, 0.0031200000,
0.0027000000, 0.0023600000, 0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000,
0.0012900000, 0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000,
0.0007170000,
],
@[
1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000,
0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 0.0049200000,
0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0015800000, 0.0013000000,
0.0010800000, 0.0009050000, 0.0007640000, 0.0006490000, 0.0005550000, 0.0004780000,
0.0004130000, 0.0003590000, 0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000,
0.0001910000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000, 0.0217000000,
0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000, 0.0033200000, 0.0024400000,
0.0018300000, 0.0013900000, 0.0010700000, 0.0008390000, 0.0006630000, 0.0005300000,
0.0004270000, 0.0003470000, 0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000,
0.0001380000, 0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000,
0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000, 0.0017900000,
0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000, 0.0003940000, 0.0003030000,
0.0002360000, 0.0001850000, 0.0001470000, 0.0001170000, 0.0000944000, 0.0000766000,
0.0000626000, 0.0000515000, 0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000,
0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000, 0.0007020000,
0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000, 0.0001470000, 0.0001120000,
0.0000856000, 0.0000663000, 0.0000518000, 0.0000408000, 0.0000324000, 0.0000259000,
0.0000209000, 0.0000169000, 0.0000138000, 0.0000113000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0145000000, 0.0084600000,
0.0050900000, 0.0031400000, 0.0019900000, 0.0012900000, 0.0008520000, 0.0005740000,
0.0003940000, 0.0002750000, 0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000,
0.0000555000, 0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000,
0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000, 0.0002260000,
0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000, 0.0000392000, 0.0000286000,
0.0000211000, 0.0000157000, 0.0000118000, 0.0000089600, 0.0000068500, 0.0000052800,
0.0000041000, 0.0000032000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000,
0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000, 0.0001980000,
0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000, 0.0000297000, 0.0000211000,
0.0000152000, 0.0000110000, 0.0000080700, 0.0000059700, 0.0000044500, 0.0000033500,
0.0000025400, 0.0000019400,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0021000000,
0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000, 0.0001830000, 0.0001180000,
0.0000777000, 0.0000518000, 0.0000350000, 0.0000240000, 0.0000166000, 0.0000116000,
0.0000082300, 0.0000058900, 0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900,
0.0000012600,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000, 0.0000712000,
0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, 0.0000094200, 0.0000065200,
0.0000045600, 0.0000032200, 0.0000022900, 0.0000016500, 0.0000012000, 0.0000008740,
],
]

View File

@ -8,18 +8,18 @@ proc toBytes(s: string): seq[byte] =
proc encode*(msg: Message): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, msg.messageId)
pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory:
pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling
pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling
pb.write(4, msg.channelId)
pb.write(5, msg.content)
pb.write(6, msg.bloomFilter)
pb.finish()
pb
proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
@ -47,11 +47,11 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
return err(ProtobufError.missingRequiredField("content"))
if not ?pb.getField(6, msg.bloomFilter):
msg.bloomFilter = @[] # Empty if not present
msg.bloomFilter = @[] # Empty if not present
ok(msg)
proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] =
proc serializeMessage*(msg: Message): Result[seq[byte], ReliabilityError] =
try:
let pb = encode(msg)
ok(pb.buffer)
@ -71,19 +71,19 @@ proc deserializeMessage*(data: seq[byte]): Result[Message, ReliabilityError] =
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
try:
var pb = initProtoBuffer()
# Convert intArray to bytes
var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
for i, val in filter.intArray:
let start = i * sizeof(int)
copyMem(addr bytes[start], unsafeAddr val, sizeof(int))
pb.write(1, bytes)
pb.write(2, uint64(filter.capacity))
pb.write(3, uint64(filter.errorRate * 1_000_000))
pb.write(4, uint64(filter.kHashes))
pb.write(5, uint64(filter.mBits))
pb.finish()
ok(pb.buffer)
except:
@ -92,31 +92,31 @@ proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityEr
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] =
if data.len == 0:
return err(reDeserializationError)
try:
let pb = initProtoBuffer(data)
var bytes: seq[byte]
var cap, errRate, kHashes, mBits: uint64
if not pb.getField(1, bytes).get() or
not pb.getField(2, cap).get() or
not pb.getField(3, errRate).get() or
not pb.getField(4, kHashes).get() or
not pb.getField(5, mBits).get():
if not pb.getField(1, bytes).get() or not pb.getField(2, cap).get() or
not pb.getField(3, errRate).get() or not pb.getField(4, kHashes).get() or
not pb.getField(5, mBits).get():
return err(reDeserializationError)
# Convert bytes back to intArray
var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len:
let start = i * sizeof(int)
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int))
ok(BloomFilter(
intArray: intArray,
capacity: int(cap),
errorRate: float(errRate) / 1_000_000,
kHashes: int(kHashes),
mBits: int(mBits)
))
ok(
BloomFilter(
intArray: intArray,
capacity: int(cap),
errorRate: float(errRate) / 1_000_000,
kHashes: int(kHashes),
mBits: int(mBits),
)
)
except:
err(reDeserializationError)
err(reDeserializationError)

View File

@ -29,4 +29,4 @@ converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError =
ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err)
proc missingRequiredField*(T: type ProtobufError, field: string): T =
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)

View File

@ -3,11 +3,10 @@ import chronos
import chronicles
import ./[bloom, message]
type
RollingBloomFilter* = object
filter*: BloomFilter
window*: times.Duration
messages*: seq[TimestampedMessageID]
type RollingBloomFilter* = object
filter*: BloomFilter
window*: times.Duration
messages*: seq[TimestampedMessageID]
const
DefaultBloomFilterCapacity* = 10000
@ -20,34 +19,33 @@ proc logError*(msg: string) =
proc logInfo*(msg: string) =
info "ReliabilityInfo", message = msg
proc newRollingBloomFilter*(capacity: int, errorRate: float, window: times.Duration): RollingBloomFilter {.gcsafe.} =
proc newRollingBloomFilter*(
capacity: int, errorRate: float, window: times.Duration
): RollingBloomFilter {.gcsafe.} =
try:
var filterResult: Result[BloomFilter, string]
{.gcsafe.}:
filterResult = initializeBloomFilter(capacity, errorRate)
if filterResult.isOk:
logInfo("Successfully initialized bloom filter")
return RollingBloomFilter(
filter: filterResult.get(), # Extract the BloomFilter from Result
window: window,
messages: @[]
messages: @[],
)
else:
logError("Failed to initialize bloom filter: " & filterResult.error)
# Fall through to default case below
except:
logError("Failed to initialize bloom filter")
# Default fallback case
let defaultResult = initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
let defaultResult =
initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isOk:
return RollingBloomFilter(
filter: defaultResult.get(),
window: window,
messages: @[]
)
return
RollingBloomFilter(filter: defaultResult.get(), window: window, messages: @[])
else:
# If even default initialization fails, raise an exception
logError("Failed to initialize bloom filter with default parameters")
@ -75,9 +73,10 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
let now = getTime()
let cutoff = now - rbf.window
var newMessages: seq[TimestampedMessageID] = @[]
# Initialize new filter
let newFilterResult = initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate)
let newFilterResult =
initializeBloomFilter(rbf.filter.capacity, rbf.filter.errorRate)
if newFilterResult.isErr:
logError("Failed to create new bloom filter: " & newFilterResult.error)
return
@ -92,4 +91,4 @@ proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
rbf.messages = newMessages
rbf.filter = newFilter
except Exception as e:
logError("Failed to clean bloom filter: " & e.msg)
logError("Failed to clean bloom filter: " & e.msg)

View File

@ -13,9 +13,9 @@ suite "bloom filter":
sampleChars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
testElements = newSeq[string](nElementsToTest)
for i in 0..<nElementsToTest:
for i in 0 ..< nElementsToTest:
var newString = ""
for j in 0..7:
for j in 0 .. 7:
newString.add(sampleChars[rand(51)])
testElements[i] = newString
@ -26,11 +26,11 @@ suite "bloom filter":
check bf.capacity == nElementsToTest
check bf.errorRate == 0.001
check bf.kHashes == 10
check bf.mBits div bf.capacity == 15 # bits per element
check bf.mBits div bf.capacity == 15 # bits per element
test "basic operations":
check bf.lookup("nonexistent") == false # Test empty lookup
check bf.lookup("nonexistent") == false # Test empty lookup
let bf2Result = initializeBloomFilter(100, 0.01)
check bf2Result.isOk
var bf2 = bf2Result.get
@ -41,16 +41,16 @@ suite "bloom filter":
test "error rate":
var falsePositives = 0
let testSize = nElementsToTest div 2
for i in 0..<testSize:
for i in 0 ..< testSize:
var testString = ""
for j in 0..8: # Different length than setup
for j in 0 .. 8: # Different length than setup
testString.add(sampleChars[rand(51)])
if bf.lookup(testString):
falsePositives.inc()
let actualErrorRate = falsePositives.float / testSize.float
check actualErrorRate < bf.errorRate * 1.5 # Allow some margin
check actualErrorRate < bf.errorRate * 1.5 # Allow some margin
test "perfect recall":
var lookupErrors = 0
for item in testElements:
@ -62,12 +62,14 @@ suite "bloom filter":
# Test error case for k > 12
let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01)
check errorCase.isErr
check errorCase.error == "K must be <= 12 if forceNBitsPerElem is not also specified."
check errorCase.error ==
"K must be <= 12 if forceNBitsPerElem is not also specified."
# Test error case for unachievable error rate
let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001)
check errorCase2.isErr
check errorCase2.error == "Specified value of k and error rate not achievable using less than 4 bytes / element."
check errorCase2.error ==
"Specified value of k and error rate not achievable using less than 4 bytes / element."
# Test success cases
let case1 = getMOverNBitsForK(k = 2, targetError = 0.1)
@ -93,50 +95,51 @@ suite "bloom filter":
check bf3Result.isOk
let bf3 = bf3Result.get
let str = $bf3
check str.contains("1000") # Capacity
check str.contains("4 hash") # Hash functions
check str.contains("1.0e-02") # Error rate in scientific notation
check str.contains("1000") # Capacity
check str.contains("4 hash") # Hash functions
check str.contains("1.0e-02") # Error rate in scientific notation
suite "bloom filter special cases":
test "different patterns of strings":
const testSize = 10_000
let patterns = @[
"shortstr",
repeat("a", 1000), # Very long string
"special@#$%^&*()", # Special characters
"unicode→★∑≈", # Unicode characters
repeat("pattern", 10) # Repeating pattern
]
let patterns =
@[
"shortstr",
repeat("a", 1000), # Very long string
"special@#$%^&*()", # Special characters
"unicode→★∑≈", # Unicode characters
repeat("pattern", 10), # Repeating pattern
]
let bfResult = initializeBloomFilter(testSize, 0.01)
check bfResult.isOk
var bf = bfResult.get
var inserted = newSeq[string](testSize)
# Test pattern handling
for pattern in patterns:
bf.insert(pattern)
assert bf.lookup(pattern), "failed lookup pattern: " & pattern
# Test general insertion and lookup
for i in 0..<testSize:
for i in 0 ..< testSize:
inserted[i] = $i & "test" & $rand(1000)
bf.insert(inserted[i])
# Verify all insertions
var lookupErrors = 0
for item in inserted:
if not bf.lookup(item):
lookupErrors.inc()
check lookupErrors == 0
# Check false positive rate
var falsePositives = 0
let fpTestSize = testSize div 2
for i in 0..<fpTestSize:
for i in 0 ..< fpTestSize:
let testItem = "notpresent" & $i & $rand(1000)
if bf.lookup(testItem):
falsePositives.inc()
let fpRate = falsePositives.float / fpTestSize.float
check fpRate < bf.errorRate * 1.5 # Allow some margin but should be close to target
check fpRate < bf.errorRate * 1.5 # Allow some margin but should be close to target

View File

@ -25,7 +25,7 @@ suite "Core Operations":
test "basic message wrapping and unwrapping":
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
@ -46,7 +46,7 @@ suite "Core Operations":
causalHistory: @[],
channelId: "testChannel",
content: @[byte(1)],
bloomFilter: @[]
bloomFilter: @[],
)
let msg2 = Message(
@ -55,7 +55,7 @@ suite "Core Operations":
causalHistory: @[],
channelId: "testChannel",
content: @[byte(2)],
bloomFilter: @[]
bloomFilter: @[],
)
let serialized1 = serializeMessage(msg1)
@ -91,9 +91,12 @@ suite "Reliability Mechanisms":
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = missingDepsCount += 1
proc(messageId: MessageID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
missingDepsCount += 1,
)
# Create dependency chain: msg3 -> msg2 -> msg1
@ -105,19 +108,19 @@ suite "Reliability Mechanisms":
let msg2 = Message(
messageId: id2,
lamportTimestamp: 2,
causalHistory: @[id1], # msg2 depends on msg1
causalHistory: @[id1], # msg2 depends on msg1
channelId: "testChannel",
content: @[byte(2)],
bloomFilter: @[]
bloomFilter: @[],
)
let msg3 = Message(
messageId: id3,
lamportTimestamp: 3,
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
channelId: "testChannel",
content: @[byte(3)],
bloomFilter: @[]
bloomFilter: @[],
)
let serialized2 = serializeMessage(msg2)
@ -130,10 +133,10 @@ suite "Reliability Mechanisms":
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get())
check unwrapResult3.isOk()
let (_, missingDeps3) = unwrapResult3.get()
check:
missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2
missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2
id1 in missingDeps3
id2 in missingDeps3
@ -141,12 +144,12 @@ suite "Reliability Mechanisms":
let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult2.isOk()
let (_, missingDeps2) = unwrapResult2.get()
check:
missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1
missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1
id1 in missingDeps2
messageReadyCount == 0 # No messages should be ready yet
messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met
let markResult1 = rm.markDependenciesMet(@[id1])
@ -156,8 +159,8 @@ suite "Reliability Mechanisms":
check:
incomingBuffer.len == 0
messageReadyCount == 2 # Both msg2 and msg3 should be ready
missingDepsCount == 2 # Should still be 2 from the initial missing deps
messageReadyCount == 2 # Both msg2 and msg3 should be ready
missingDepsCount == 2 # Should still be 2 from the initial missing deps
test "acknowledgment via causal history":
var messageReadyCount = 0
@ -165,9 +168,12 @@ suite "Reliability Mechanisms":
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = missingDepsCount += 1
proc(messageId: MessageID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
missingDepsCount += 1,
)
# Send our message
@ -180,30 +186,34 @@ suite "Reliability Mechanisms":
let msg2 = Message(
messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1,
causalHistory: @[id1], # Include our message in causal history
causalHistory: @[id1], # Include our message in causal history
channelId: "testChannel",
content: @[byte(2)],
bloomFilter: @[] # Test with an empty bloom filter
bloomFilter: @[] # Test with an empty bloom filter
,
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
# Process the "received" message - should trigger callbacks
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check:
messageReadyCount == 1 # For msg2 which we "received"
messageSentCount == 1 # For msg1 which was acknowledged via causal history
messageReadyCount == 1 # For msg2 which we "received"
messageSentCount == 1 # For msg1 which was acknowledged via causal history
test "acknowledgment via bloom filter":
var messageSentCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
discard,
)
# Send our message
@ -214,22 +224,20 @@ suite "Reliability Mechanisms":
# Create a message with bloom filter containing our message
var otherPartyBloomFilter = newRollingBloomFilter(
DefaultBloomFilterCapacity,
DefaultBloomFilterErrorRate,
DefaultBloomFilterWindow
DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate, DefaultBloomFilterWindow
)
otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk()
let msg2 = Message(
messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1,
causalHistory: @[], # Empty causal history as we're using bloom filter
causalHistory: @[], # Empty causal history as we're using bloom filter
channelId: "testChannel",
content: @[byte(2)],
bloomFilter: bfResult.get()
bloomFilter: bfResult.get(),
)
let serializedMsg2 = serializeMessage(msg2)
@ -237,8 +245,8 @@ suite "Reliability Mechanisms":
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
# Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management":
@ -255,15 +263,18 @@ suite "Periodic Tasks & Buffer Management":
test "outgoing buffer management":
var messageSentCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
discard,
)
# Add multiple messages
for i in 0..5:
for i in 0 .. 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id)
@ -279,37 +290,41 @@ suite "Periodic Tasks & Buffer Management":
causalHistory: @["msg0", "msg2", "msg4"],
channelId: "testChannel",
content: @[byte(100)],
bloomFilter: @[]
bloomFilter: @[],
)
let serializedAck = serializeMessage(ackMsg)
check serializedAck.isOk()
# Process the acknowledgment
discard rm.unwrapReceivedMessage(serializedAck.get())
let finalBuffer = rm.getOutgoingBuffer()
check:
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3 # Should have triggered sent callback for acknowledged messages
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3
# Should have triggered sent callback for acknowledged messages
test "periodic buffer sweep and bloom clean":
var messageSentCount = 0
var config = defaultConfig()
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager("testChannel", config)
check rmResultP.isOk()
let rm = rmResultP.get()
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID) {.gcsafe.} = messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
discard,
)
# First message - should be cleaned from bloom filter later
@ -324,10 +339,10 @@ suite "Periodic Tasks & Buffer Management":
rm.bloomFilter.contains(id1)
rm.startPeriodicTasks()
# Wait long enough for bloom filter window to pass and first message to exceed max retries
waitFor sleepAsync(chronos.milliseconds(500))
# Add new message
let msg2 = @[byte(2)]
let id2 = "msg2"
@ -336,27 +351,32 @@ suite "Periodic Tasks & Buffer Management":
let finalBuffer = rm.getOutgoingBuffer()
check:
finalBuffer.len == 1 # Only msg2 should be in buffer, msg1 should be removed after max retries
finalBuffer.len == 1
# Only msg2 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
rm.bloomFilter.contains(id2) # New message still in filter
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
rm.bloomFilter.contains(id2) # New message still in filter
rm.cleanup()
test "periodic sync callback":
var syncCallCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard,
proc() {.gcsafe.} = syncCallCount += 1
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
discard,
proc() {.gcsafe.} =
syncCallCount += 1,
)
rm.startPeriodicTasks()
waitFor sleepAsync(chronos.seconds(1))
rm.cleanup()
check syncCallCount > 0
# Special cases handling
@ -374,12 +394,12 @@ suite "Special Cases Handling":
test "message history limits":
# Add messages up to max history size
for i in 0..rm.config.maxMessageHistory + 5:
for i in 0 .. rm.config.maxMessageHistory + 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id)
check wrap.isOk()
let history = rm.getMessageHistory()
check:
history.len <= rm.config.maxMessageHistory
@ -392,7 +412,8 @@ suite "Special Cases Handling":
causalHistory: @[],
channelId: "testChannel",
content: @[byte(1)],
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
,
)
let serializedInvalid = serializeMessage(msgInvalid)
@ -402,14 +423,17 @@ suite "Special Cases Handling":
let result = rm.unwrapReceivedMessage(serializedInvalid.get())
check:
result.isOk()
result.get()[1].len == 0 # No missing dependencies
result.get()[1].len == 0 # No missing dependencies
test "duplicate message handling":
var messageReadyCount = 0
rm.setCallbacks(
proc(messageId: MessageID) {.gcsafe.} = messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} = discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = discard
proc(messageId: MessageID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: MessageID) {.gcsafe.} =
discard,
proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} =
discard,
)
# Create and process a message
@ -419,7 +443,7 @@ suite "Special Cases Handling":
causalHistory: @[],
channelId: "testChannel",
content: @[byte(1)],
bloomFilter: @[]
bloomFilter: @[],
)
let serialized = serializeMessage(msg)
@ -431,8 +455,8 @@ suite "Special Cases Handling":
let result2 = rm.unwrapReceivedMessage(serialized.get())
check:
result2.isOk()
result2.get()[1].len == 0 # No missing deps on second process
messageReadyCount == 1 # Message should only be processed once
result2.get()[1].len == 0 # No missing deps on second process
messageReadyCount == 1 # Message should only be processed once
test "error handling":
# Empty message
@ -466,4 +490,4 @@ suite "cleanup":
let history = rm.getMessageHistory()
check:
outBuffer.len == 0
history.len == 0
history.len == 0