feat: add rolling bloom filter, reliability utils and protobuf (#4)

This commit is contained in:
Akhil 2025-02-11 13:23:19 +05:30 committed by GitHub
parent 5df71ad3ea
commit 89160b58d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 566 additions and 147 deletions

3
.gitignore vendored
View File

@ -1,6 +1,7 @@
nimcache
nimcache/*
tests/bloom
tests/test_bloom
nim-bloom/bloom
.DS_Store
src/.DS_Store
nph

16
reliability.nimble Normal file
View File

@ -0,0 +1,16 @@
# Package
version = "0.1.0"
author = "Waku Team"
description = "E2E Reliability Protocol API"
license = "MIT"
srcDir = "src"
# Dependencies
requires "nim >= 2.0.8"
requires "chronicles"
requires "libp2p"
# Tasks
task test, "Run the test suite":
exec "nim c -r tests/test_bloom.nim"
exec "nim c -r tests/test_reliability.nim"

View File

@ -4,13 +4,12 @@ import strutils
import results
import private/probabilities
type
BloomFilter* = object
type BloomFilter* = object
capacity*: int
errorRate*: float
kHashes*: int
mBits*: int
intArray: seq[int]
intArray*: seq[int]
{.push overflowChecks: off.} # Turn off overflow checks for hashing operations
@ -31,8 +30,9 @@ proc hashN(item: string, n: int, maxValue: int): int =
{.pop.}
proc getMOverNBitsForK*(k: int, targetError: float,
probabilityTable = kErrors): Result[int, string] =
proc getMOverNBitsForK*(
k: int, targetError: float, probabilityTable = kErrors
): Result[int, string] =
## Returns the optimal number of m/n bits for a given k.
if k notin 0 .. 12:
return err("K must be <= 12 if forceNBitsPerElem is not also specified.")
@ -41,10 +41,13 @@ proc getMOverNBitsForK*(k: int, targetError: float,
if probabilityTable[k][mOverN] < targetError:
return ok(mOverN)
err("Specified value of k and error rate not achievable using less than 4 bytes / element.")
err(
"Specified value of k and error rate not achievable using less than 4 bytes / element."
)
proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
forceNBitsPerElem = 0): Result[BloomFilter, string] =
proc initializeBloomFilter*(
capacity: int, errorRate: float, k = 0, forceNBitsPerElem = 0
): Result[BloomFilter, string] =
## Initializes a Bloom filter with specified parameters.
##
## Parameters:
@ -76,21 +79,25 @@ proc initializeBloomFilter*(capacity: int, errorRate: float, k = 0,
mBits = capacity * nBitsPerElem
mInts = 1 + mBits div (sizeof(int) * 8)
ok(BloomFilter(
ok(
BloomFilter(
capacity: capacity,
errorRate: errorRate,
kHashes: kHashes,
mBits: mBits,
intArray: newSeq[int](mInts)
))
intArray: newSeq[int](mInts),
)
)
proc `$`*(bf: BloomFilter): string =
## Prints the configuration of the Bloom filter.
"Bloom filter with $1 capacity, $2 error rate, $3 hash functions, and requiring $4 bits of memory." %
[$bf.capacity,
[
$bf.capacity,
formatFloat(bf.errorRate, format = ffScientific, precision = 1),
$bf.kHashes,
$(bf.mBits div bf.capacity)]
$(bf.mBits div bf.capacity),
]
proc computeHashes(bf: BloomFilter, item: string): seq[int] =
var hashes = newSeq[int](bf.kHashes)

27
src/message.nim Normal file
View File

@ -0,0 +1,27 @@
import std/times
type
SdsMessageID* = seq[byte]
SdsChannelID* = seq[byte]
SdsMessage* = object
messageId*: SdsMessageID
lamportTimestamp*: int64
causalHistory*: seq[SdsMessageID]
channelId*: SdsChannelID
content*: seq[byte]
bloomFilter*: seq[byte]
UnacknowledgedMessage* = object
message*: SdsMessage
sendTime*: Time
resendAttempts*: int
const
DefaultMaxMessageHistory* = 1000
DefaultMaxCausalHistory* = 10
DefaultResendInterval* = initDuration(seconds = 60)
DefaultMaxResendAttempts* = 5
DefaultSyncMessageInterval* = initDuration(seconds = 30)
DefaultBufferSweepInterval* = initDuration(seconds = 60)
MaxMessageSize* = 1024 * 1024 # 1 MB

View File

@ -9,90 +9,93 @@ type
TErrorForK = seq[float]
TAllErrorRates* = array[0 .. 12, TErrorForK]
let kErrors*: TAllErrorRates = [
const kErrors*: TAllErrorRates = [
@[1.0],
@[1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000,
0.1540000000, 0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000,
0.0869000000, 0.0800000000, 0.0740000000, 0.0689000000, 0.0645000000,
0.0606000000, 0.0571000000, 0.0540000000, 0.0513000000, 0.0488000000,
0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000, 0.0392000000,
0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000,
0.0317000000, 0.0308000000],
@[1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000,
0.0804000000, 0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000,
0.0276000000, 0.0236000000, 0.0203000000, 0.0177000000, 0.0156000000,
0.0138000000, 0.0123000000, 0.0111000000, 0.0099800000, 0.0090600000,
0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000, 0.0059100000,
0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000,
0.0039000000, 0.0036700000],
@[1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000,
0.0423000000, 0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000,
0.0108000000, 0.0087500000, 0.0071800000, 0.0059600000, 0.0050000000,
0.0042300000, 0.0036200000, 0.0031200000, 0.0027000000, 0.0023600000,
0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000, 0.0012900000,
0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000,
0.0007170000],
@[1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000,
0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000,
0.0049200000, 0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000,
0.0015800000, 0.0013000000, 0.0010800000, 0.0009050000, 0.0007640000,
0.0006490000, 0.0005550000, 0.0004780000, 0.0004130000, 0.0003590000,
0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000, 0.0001910000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000,
0.0217000000, 0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000,
0.0033200000, 0.0024400000, 0.0018300000, 0.0013900000, 0.0010700000,
0.0008390000, 0.0006630000, 0.0005300000, 0.0004270000, 0.0003470000,
0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000, 0.0001380000,
0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000,
0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000,
0.0017900000, 0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000,
0.0003940000, 0.0003030000, 0.0002360000, 0.0001850000, 0.0001470000,
0.0001170000, 0.0000944000, 0.0000766000, 0.0000626000, 0.0000515000,
0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000],
@[1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000,
0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000,
0.0007020000, 0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000,
0.0001470000, 0.0001120000, 0.0000856000, 0.0000663000, 0.0000518000,
0.0000408000, 0.0000324000, 0.0000259000, 0.0000209000, 0.0000169000,
0.0000138000, 0.0000113000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
1.0, 0.0145000000, 0.0084600000, 0.0050900000, 0.0031400000, 0.0019900000,
0.0012900000, 0.0008520000, 0.0005740000, 0.0003940000, 0.0002750000,
0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000, 0.0000555000,
0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300],
@[1.0, 1.0, 1.0,
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000,
0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000,
0.0002260000, 0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000,
0.0000392000, 0.0000286000, 0.0000211000, 0.0000157000, 0.0000118000,
0.0000089600, 0.0000068500, 0.0000052800, 0.0000041000, 0.0000032000],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000,
0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000,
0.0001980000, 0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000,
0.0000297000, 0.0000211000, 0.0000152000, 0.0000110000, 0.0000080700,
0.0000059700, 0.0000044500, 0.0000033500, 0.0000025400, 0.0000019400],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0021000000, 0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000,
0.0001830000, 0.0001180000, 0.0000777000, 0.0000518000, 0.0000350000,
0.0000240000, 0.0000166000, 0.0000116000, 0.0000082300, 0.0000058900,
0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900, 0.0000012600],
@[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000,
0.0000712000, 0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000,
0.0000094200, 0.0000065200, 0.0000045600, 0.0000032200, 0.0000022900,
0.0000016500, 0.0000012000, 0.0000008740]
@[
1.0, 1.0, 0.3930000000, 0.2830000000, 0.2210000000, 0.1810000000, 0.1540000000,
0.1330000000, 0.1180000000, 0.1050000000, 0.0952000000, 0.0869000000, 0.0800000000,
0.0740000000, 0.0689000000, 0.0645000000, 0.0606000000, 0.0571000000, 0.0540000000,
0.0513000000, 0.0488000000, 0.0465000000, 0.0444000000, 0.0425000000, 0.0408000000,
0.0392000000, 0.0377000000, 0.0364000000, 0.0351000000, 0.0339000000, 0.0328000000,
0.0317000000, 0.0308000000,
],
@[
1.0, 1.0, 0.4000000000, 0.2370000000, 0.1550000000, 0.1090000000, 0.0804000000,
0.0618000000, 0.0489000000, 0.0397000000, 0.0329000000, 0.0276000000, 0.0236000000,
0.0203000000, 0.0177000000, 0.0156000000, 0.0138000000, 0.0123000000, 0.0111000000,
0.0099800000, 0.0090600000, 0.0082500000, 0.0075500000, 0.0069400000, 0.0063900000,
0.0059100000, 0.0054800000, 0.0051000000, 0.0047500000, 0.0044400000, 0.0041600000,
0.0039000000, 0.0036700000,
],
@[
1.0, 1.0, 1.0, 0.2530000000, 0.1470000000, 0.0920000000, 0.0609000000, 0.0423000000,
0.0306000000, 0.0228000000, 0.0174000000, 0.0136000000, 0.0108000000, 0.0087500000,
0.0071800000, 0.0059600000, 0.0050000000, 0.0042300000, 0.0036200000, 0.0031200000,
0.0027000000, 0.0023600000, 0.0020700000, 0.0018300000, 0.0016200000, 0.0014500000,
0.0012900000, 0.0011600000, 0.0010500000, 0.0009490000, 0.0008620000, 0.0007850000,
0.0007170000,
],
@[
1.0, 1.0, 1.0, 1.0, 0.1600000000, 0.0920000000, 0.0561000000, 0.0359000000,
0.0240000000, 0.0166000000, 0.0118000000, 0.0086400000, 0.0064600000, 0.0049200000,
0.0038100000, 0.0030000000, 0.0023900000, 0.0019300000, 0.0015800000, 0.0013000000,
0.0010800000, 0.0009050000, 0.0007640000, 0.0006490000, 0.0005550000, 0.0004780000,
0.0004130000, 0.0003590000, 0.0003140000, 0.0002760000, 0.0002430000, 0.0002150000,
0.0001910000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 0.1010000000, 0.0578000000, 0.0347000000, 0.0217000000,
0.0141000000, 0.0094300000, 0.0065000000, 0.0045900000, 0.0033200000, 0.0024400000,
0.0018300000, 0.0013900000, 0.0010700000, 0.0008390000, 0.0006630000, 0.0005300000,
0.0004270000, 0.0003470000, 0.0002850000, 0.0002350000, 0.0001960000, 0.0001640000,
0.0001380000, 0.0001170000, 0.0000996000, 0.0000853000, 0.0000733000, 0.0000633000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0638000000, 0.0364000000, 0.0216000000,
0.0133000000, 0.0084400000, 0.0055200000, 0.0037100000, 0.0025500000, 0.0017900000,
0.0012800000, 0.0009350000, 0.0006920000, 0.0005190000, 0.0003940000, 0.0003030000,
0.0002360000, 0.0001850000, 0.0001470000, 0.0001170000, 0.0000944000, 0.0000766000,
0.0000626000, 0.0000515000, 0.0000426000, 0.0000355000, 0.0000297000, 0.0000250000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0229000000, 0.0135000000, 0.0081900000,
0.0051300000, 0.0032900000, 0.0021700000, 0.0014600000, 0.0010000000, 0.0007020000,
0.0004990000, 0.0003600000, 0.0002640000, 0.0001960000, 0.0001470000, 0.0001120000,
0.0000856000, 0.0000663000, 0.0000518000, 0.0000408000, 0.0000324000, 0.0000259000,
0.0000209000, 0.0000169000, 0.0000138000, 0.0000113000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0145000000, 0.0084600000,
0.0050900000, 0.0031400000, 0.0019900000, 0.0012900000, 0.0008520000, 0.0005740000,
0.0003940000, 0.0002750000, 0.0001940000, 0.0001400000, 0.0001010000, 0.0000746000,
0.0000555000, 0.0000417000, 0.0000316000, 0.0000242000, 0.0000187000, 0.0000146000,
0.0000114000, 0.0000090100, 0.0000071600, 0.0000057300,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0053100000, 0.0031700000,
0.0019400000, 0.0012100000, 0.0007750000, 0.0005050000, 0.0003350000, 0.0002260000,
0.0001550000, 0.0001080000, 0.0000759000, 0.0000542000, 0.0000392000, 0.0000286000,
0.0000211000, 0.0000157000, 0.0000118000, 0.0000089600, 0.0000068500, 0.0000052800,
0.0000041000, 0.0000032000,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0033400000,
0.0019800000, 0.0012000000, 0.0007440000, 0.0004700000, 0.0003020000, 0.0001980000,
0.0001320000, 0.0000889000, 0.0000609000, 0.0000423000, 0.0000297000, 0.0000211000,
0.0000152000, 0.0000110000, 0.0000080700, 0.0000059700, 0.0000044500, 0.0000033500,
0.0000025400, 0.0000019400,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0021000000,
0.0012400000, 0.0007470000, 0.0004590000, 0.0002870000, 0.0001830000, 0.0001180000,
0.0000777000, 0.0000518000, 0.0000350000, 0.0000240000, 0.0000166000, 0.0000116000,
0.0000082300, 0.0000058900, 0.0000042500, 0.0000031000, 0.0000022800, 0.0000016900,
0.0000012600,
],
@[
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
0.0007780000, 0.0004660000, 0.0002840000, 0.0001760000, 0.0001110000, 0.0000712000,
0.0000463000, 0.0000305000, 0.0000204000, 0.0000138000, 0.0000094200, 0.0000065200,
0.0000045600, 0.0000032200, 0.0000022900, 0.0000016500, 0.0000012000, 0.0000008740,
],
]

114
src/protobuf.nim Normal file
View File

@ -0,0 +1,114 @@
import libp2p/protobuf/minprotobuf
import std/options
import endians
import ../src/[message, protobufutil, bloom, reliability_utils]
proc encode*(msg: SdsMessage): ProtoBuffer =
var pb = initProtoBuffer()
pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory:
pb.write(3, hist)
pb.write(4, msg.channelId)
pb.write(5, msg.content)
pb.write(6, msg.bloomFilter)
pb.finish()
pb
proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var msg = SdsMessage()
if not ?pb.getField(1, msg.messageId):
return err(ProtobufError.missingRequiredField("messageId"))
var timestamp: uint64
if not ?pb.getField(2, timestamp):
return err(ProtobufError.missingRequiredField("lamportTimestamp"))
msg.lamportTimestamp = int64(timestamp)
var causalHistory: seq[seq[byte]]
let histResult = pb.getRepeatedField(3, causalHistory)
if histResult.isOk:
msg.causalHistory = causalHistory
if not ?pb.getField(4, msg.channelId):
return err(ProtobufError.missingRequiredField("channelId"))
if not ?pb.getField(5, msg.content):
return err(ProtobufError.missingRequiredField("content"))
if not ?pb.getField(6, msg.bloomFilter):
msg.bloomFilter = @[] # Empty if not present
ok(msg)
proc serializeMessage*(msg: SdsMessage): Result[seq[byte], ReliabilityError] =
let pb = encode(msg)
ok(pb.buffer)
proc deserializeMessage*(data: seq[byte]): Result[SdsMessage, ReliabilityError] =
let msg = SdsMessage.decode(data).valueOr:
return err(ReliabilityError.reDeserializationError)
ok(msg)
proc serializeBloomFilter*(filter: BloomFilter): Result[seq[byte], ReliabilityError] =
var pb = initProtoBuffer()
# Convert intArray to bytes
try:
var bytes = newSeq[byte](filter.intArray.len * sizeof(int))
for i, val in filter.intArray:
var leVal: int
littleEndian64(addr leVal, unsafeAddr val)
let start = i * sizeof(int)
copyMem(addr bytes[start], addr leVal, sizeof(int))
pb.write(1, bytes)
pb.write(2, uint64(filter.capacity))
pb.write(3, uint64(filter.errorRate * 1_000_000))
pb.write(4, uint64(filter.kHashes))
pb.write(5, uint64(filter.mBits))
except:
return err(ReliabilityError.reSerializationError)
pb.finish()
ok(pb.buffer)
proc deserializeBloomFilter*(data: seq[byte]): Result[BloomFilter, ReliabilityError] =
if data.len == 0:
return err(ReliabilityError.reDeserializationError)
let pb = initProtoBuffer(data)
var bytes: seq[byte]
var cap, errRate, kHashes, mBits: uint64
try:
if not pb.getField(1, bytes).get() or not pb.getField(2, cap).get() or
not pb.getField(3, errRate).get() or not pb.getField(4, kHashes).get() or
not pb.getField(5, mBits).get():
return err(ReliabilityError.reDeserializationError)
# Convert bytes back to intArray
var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len:
var leVal: int
let start = i * sizeof(int)
copyMem(addr leVal, unsafeAddr bytes[start], sizeof(int))
littleEndian64(addr intArray[i], addr leVal)
ok(
BloomFilter(
intArray: intArray,
capacity: int(cap),
errorRate: float(errRate) / 1_000_000,
kHashes: int(kHashes),
mBits: int(mBits),
)
)
except:
return err(ReliabilityError.reDeserializationError)

32
src/protobufutil.nim Normal file
View File

@ -0,0 +1,32 @@
# adapted from https://github.com/waku-org/nwaku/blob/master/waku/common/protobuf.nim
{.push raises: [].}
import libp2p/protobuf/minprotobuf
import libp2p/varint
export minprotobuf, varint
type
ProtobufErrorKind* {.pure.} = enum
DecodeFailure
MissingRequiredField
ProtobufError* = object
case kind*: ProtobufErrorKind
of DecodeFailure:
error*: minprotobuf.ProtoError
of MissingRequiredField:
field*: string
ProtobufResult*[T] = Result[T, ProtobufError]
converter toProtobufError*(err: minprotobuf.ProtoError): ProtobufError =
case err
of minprotobuf.ProtoError.RequiredFieldMissing:
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: "unknown")
else:
ProtobufError(kind: ProtobufErrorKind.DecodeFailure, error: err)
proc missingRequiredField*(T: type ProtobufError, field: string): T =
ProtobufError(kind: ProtobufErrorKind.MissingRequiredField, field: field)

97
src/reliability_utils.nim Normal file
View File

@ -0,0 +1,97 @@
import std/[times, locks]
import chronicles
import ./[rolling_bloom_filter, message]
type
PeriodicSyncCallback* = proc() {.gcsafe, raises: [].}
ReliabilityConfig* = object
bloomFilterCapacity*: int
bloomFilterErrorRate*: float
maxMessageHistory*: int
maxCausalHistory*: int
resendInterval*: Duration
maxResendAttempts*: int
syncMessageInterval*: Duration
bufferSweepInterval*: Duration
ReliabilityManager* = ref object
lamportTimestamp*: int64
messageHistory*: seq[SdsMessageID]
bloomFilter*: RollingBloomFilter
outgoingBuffer*: seq[UnacknowledgedMessage]
incomingBuffer*: seq[SdsMessage]
channelId*: SdsChannelID
config*: ReliabilityConfig
lock*: Lock
onMessageReady*: proc(messageId: SdsMessageID) {.gcsafe.}
onMessageSent*: proc(messageId: SdsMessageID) {.gcsafe.}
onMissingDependencies*:
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback
ReliabilityError* {.pure.} = enum
reInvalidArgument
reOutOfMemory
reInternalError
reSerializationError
reDeserializationError
reMessageTooLarge
proc defaultConfig*(): ReliabilityConfig =
## Creates a default configuration for the ReliabilityManager.
##
## Returns:
## A ReliabilityConfig object with default values.
ReliabilityConfig(
bloomFilterCapacity: DefaultBloomFilterCapacity,
bloomFilterErrorRate: DefaultBloomFilterErrorRate,
maxMessageHistory: DefaultMaxMessageHistory,
maxCausalHistory: DefaultMaxCausalHistory,
resendInterval: DefaultResendInterval,
maxResendAttempts: DefaultMaxResendAttempts,
syncMessageInterval: DefaultSyncMessageInterval,
bufferSweepInterval: DefaultBufferSweepInterval,
)
proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil():
try:
withLock rm.lock:
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.messageHistory.setLen(0)
except Exception:
error "Error during cleanup", error = getCurrentExceptionMsg()
proc cleanBloomFilter*(rm: ReliabilityManager) {.gcsafe, raises: [].} =
withLock rm.lock:
try:
rm.bloomFilter.clean()
except Exception:
error "Failed to clean bloom filter", error = getCurrentExceptionMsg()
proc addToHistory*(rm: ReliabilityManager, msgId: SdsMessageID) {.gcsafe, raises: [].} =
rm.messageHistory.add(msgId)
if rm.messageHistory.len > rm.config.maxMessageHistory:
rm.messageHistory.delete(0)
proc updateLamportTimestamp*(
rm: ReliabilityManager, msgTs: int64
) {.gcsafe, raises: [].} =
rm.lamportTimestamp = max(msgTs, rm.lamportTimestamp) + 1
proc getRecentSdsMessageIDs*(rm: ReliabilityManager, n: int): seq[SdsMessageID] =
result = rm.messageHistory[max(0, rm.messageHistory.len - n) .. ^1]
proc getMessageHistory*(rm: ReliabilityManager): seq[SdsMessageID] =
withLock rm.lock:
result = rm.messageHistory
proc getOutgoingBuffer*(rm: ReliabilityManager): seq[UnacknowledgedMessage] =
withLock rm.lock:
result = rm.outgoingBuffer
proc getIncomingBuffer*(rm: ReliabilityManager): seq[SdsMessage] =
withLock rm.lock:
result = rm.incomingBuffer

View File

@ -0,0 +1,118 @@
import chronos
import chronicles
import ./[bloom, message]
type RollingBloomFilter* = object
filter*: BloomFilter
capacity*: int
minCapacity*: int
maxCapacity*: int
messages*: seq[SdsMessageID]
const
DefaultBloomFilterCapacity* = 10000
DefaultBloomFilterErrorRate* = 0.001
CapacityFlexPercent* = 20
proc newRollingBloomFilter*(
capacity: int = DefaultBloomFilterCapacity,
errorRate: float = DefaultBloomFilterErrorRate,
): RollingBloomFilter {.gcsafe.} =
let targetCapacity = if capacity <= 0: DefaultBloomFilterCapacity else: capacity
let targetError =
if errorRate <= 0.0 or errorRate >= 1.0: DefaultBloomFilterErrorRate else: errorRate
let filterResult = initializeBloomFilter(targetCapacity, targetError)
if filterResult.isErr:
error "Failed to initialize bloom filter", error = filterResult.error
# Try with default values if custom values failed
if capacity != DefaultBloomFilterCapacity or errorRate != DefaultBloomFilterErrorRate:
let defaultResult =
initializeBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
if defaultResult.isErr:
error "Failed to initialize bloom filter with default parameters",
error = defaultResult.error
let minCapacity = (
DefaultBloomFilterCapacity.float * (100 - CapacityFlexPercent).float / 100.0
).int
let maxCapacity = (
DefaultBloomFilterCapacity.float * (100 + CapacityFlexPercent).float / 100.0
).int
info "Successfully initialized bloom filter with default parameters",
capacity = DefaultBloomFilterCapacity,
minCapacity = minCapacity,
maxCapacity = maxCapacity
return RollingBloomFilter(
filter: defaultResult.get(),
capacity: DefaultBloomFilterCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[],
)
else:
error "Could not create bloom filter", error = filterResult.error
let minCapacity =
(targetCapacity.float * (100 - CapacityFlexPercent).float / 100.0).int
let maxCapacity =
(targetCapacity.float * (100 + CapacityFlexPercent).float / 100.0).int
info "Successfully initialized bloom filter",
capacity = targetCapacity, minCapacity = minCapacity, maxCapacity = maxCapacity
return RollingBloomFilter(
filter: filterResult.get(),
capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[],
)
proc clean*(rbf: var RollingBloomFilter) {.gcsafe.} =
try:
if rbf.messages.len <= rbf.maxCapacity:
return # Don't clean unless we exceed max capacity
# Initialize new filter
var newFilter = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate).valueOr:
error "Failed to create new bloom filter", error = $error
return
# Keep most recent messages up to minCapacity
let keepCount = rbf.minCapacity
let startIdx = max(0, rbf.messages.len - keepCount)
var newMessages: seq[SdsMessageID] = @[]
for i in startIdx ..< rbf.messages.len:
newMessages.add(rbf.messages[i])
newFilter.insert(cast[string](rbf.messages[i]))
rbf.messages = newMessages
rbf.filter = newFilter
except Exception:
error "Failed to clean bloom filter", error = getCurrentExceptionMsg()
proc add*(rbf: var RollingBloomFilter, messageId: SdsMessageID) {.gcsafe.} =
## Adds a message ID to the rolling bloom filter.
##
## Parameters:
## - messageId: The ID of the message to add.
rbf.filter.insert(cast[string](messageId))
rbf.messages.add(messageId)
# Clean if we exceed max capacity
if rbf.messages.len > rbf.maxCapacity:
rbf.clean()
proc contains*(rbf: RollingBloomFilter, messageId: SdsMessageID): bool =
## Checks if a message ID is in the rolling bloom filter.
##
## Parameters:
## - messageId: The ID of the message to check.
##
## Returns:
## True if the message ID is probably in the filter, false otherwise.
rbf.filter.lookup(cast[string](messageId))

View File

@ -1,6 +1,7 @@
import unittest, results, strutils
import ../src/bloom
from random import rand, randomize
import ../src/[message, protobuf, protobufutil, reliability_utils, rolling_bloom_filter]
suite "bloom filter":
setup:
@ -62,12 +63,14 @@ suite "bloom filter":
# Test error case for k > 12
let errorCase = getMOverNBitsForK(k = 13, targetError = 0.01)
check errorCase.isErr
check errorCase.error == "K must be <= 12 if forceNBitsPerElem is not also specified."
check errorCase.error ==
"K must be <= 12 if forceNBitsPerElem is not also specified."
# Test error case for unachievable error rate
let errorCase2 = getMOverNBitsForK(k = 2, targetError = 0.00001)
check errorCase2.isErr
check errorCase2.error == "Specified value of k and error rate not achievable using less than 4 bytes / element."
check errorCase2.error ==
"Specified value of k and error rate not achievable using less than 4 bytes / element."
# Test success cases
let case1 = getMOverNBitsForK(k = 2, targetError = 0.1)
@ -100,12 +103,13 @@ suite "bloom filter":
suite "bloom filter special cases":
test "different patterns of strings":
const testSize = 10_000
let patterns = @[
let patterns =
@[
"shortstr",
repeat("a", 1000), # Very long string
"special@#$%^&*()", # Special characters
"unicode→★∑≈", # Unicode characters
repeat("pattern", 10) # Repeating pattern
repeat("pattern", 10), # Repeating pattern
]
let bfResult = initializeBloomFilter(testSize, 0.01)